diff options
Diffstat (limited to 'glucometerutils/support/freestyle.py')
-rw-r--r-- | glucometerutils/support/freestyle.py | 147 |
1 files changed, 80 insertions, 67 deletions
diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py index d48ac04..341e978 100644 --- a/glucometerutils/support/freestyle.py +++ b/glucometerutils/support/freestyle.py @@ -18,7 +18,7 @@ from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple import construct from glucometerutils import exceptions -from glucometerutils.support import hiddevice, driver_base +from glucometerutils.support import driver_base, hiddevice _INIT_COMMAND = 0x01 _INIT_RESPONSE = 0x71 @@ -30,54 +30,66 @@ _ENCRYPTION_SETUP_COMMAND = 0x14 _ENCRYPTION_SETUP_RESPONSE = 0x33 _ALWAYS_UNENCRYPTED_MESSAGES = ( - _INIT_COMMAND, 0x04, 0x05, 0x06, 0x0c, 0x0d, - _ENCRYPTION_SETUP_COMMAND, 0x15, - _ENCRYPTION_SETUP_RESPONSE, 0x34, 0x35, + _INIT_COMMAND, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + _ENCRYPTION_SETUP_COMMAND, + 0x15, + _ENCRYPTION_SETUP_RESPONSE, + 0x34, + 0x35, _INIT_RESPONSE, _KEEPALIVE_RESPONSE, ) + def _create_matcher(message_type, content): # type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool] def _matcher(message): - return ( - message[0] == message_type and - (content is None or content == message[1])) + return message[0] == message_type and (content is None or content == message[1]) return _matcher -_is_init_reply = _create_matcher(_INIT_RESPONSE, b'\x01') + +_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01") _is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None) -_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b'\x85') -_is_encryption_missing_error = _create_matcher( - _ENCRYPTION_SETUP_RESPONSE, b'\x15') -_is_encryption_setup_error = _create_matcher( - _ENCRYPTION_SETUP_RESPONSE, b'\x14') +_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85") +_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15") +_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14") _FREESTYLE_MESSAGE = construct.Struct( - 'hid_report' / construct.Const(0, construct.Byte), - 'message_type' / construct.Byte, - 'command' / construct.Padded( + "hid_report" / construct.Const(0, construct.Byte), + "message_type" / construct.Byte, + "command" + / construct.Padded( 63, # command can only be up to 62 bytes, but one is used for length. - construct.Prefixed(construct.Byte, construct.GreedyBytes)), + construct.Prefixed(construct.Byte, construct.GreedyBytes), + ), ) _FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct( - 'hid_report' / construct.Const(0, construct.Byte), - 'message_type' / construct.Byte, - 'command' / construct.Padded( + "hid_report" / construct.Const(0, construct.Byte), + "message_type" / construct.Byte, + "command" + / construct.Padded( 63, # command can only be up to 62 bytes, but one is used for length. - construct.GreedyBytes), + construct.GreedyBytes, + ), ) -_TEXT_COMPLETION_RE = re.compile(b'CMD (?:OK|Fail!)') +_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)") _TEXT_REPLY_FORMAT = re.compile( - b'^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n' - b'CMD (?P<status>OK|Fail!)\r\n$', re.DOTALL) + b"^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n" + b"CMD (?P<status>OK|Fail!)\r\n$", + re.DOTALL, +) _MULTIRECORDS_FORMAT = re.compile( - '^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$', - re.DOTALL) + "^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$", re.DOTALL +) def _verify_checksum(message, expected_checksum_hex): @@ -131,17 +143,18 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) TEXT_CMD = 0x60 TEXT_REPLY_CMD = 0x60 - USB_VENDOR_ID = 0x1a61 # type: int # Abbott Diabetes Care + USB_VENDOR_ID = 0x1A61 # type: int # Abbott Diabetes Care USB_PRODUCT_ID = None # type: int def connect(self): """Open connection to the device, starting the knocking sequence.""" - self._send_command(_INIT_COMMAND, b'') + self._send_command(_INIT_COMMAND, b"") response = self._read_response() if not _is_init_reply(response): raise exceptions.ConnectionFailed( - 'Connection error: unexpected message %02x:%s' % ( - response[0], response[1].hex())) + "Connection error: unexpected message %02x:%s" + % (response[0], response[1].hex()) + ) def disconnect(self): """Disconnect the device, nothing to be done.""" @@ -162,9 +175,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) meta_construct = _FREESTYLE_MESSAGE usb_packet = meta_construct.build( - {'message_type': message_type, 'command': command}) + {"message_type": message_type, "command": command} + ) - logging.debug('Sending packet: %r', usb_packet) + logging.debug("Sending packet: %r", usb_packet) self._write(usb_packet) def _read_response(self, encrypted=False): @@ -172,14 +186,14 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) """Read the response from the device and extracts it.""" usb_packet = self._read() - logging.debug('Read packet: %r', usb_packet) + logging.debug("Read packet: %r", usb_packet) assert usb_packet message_type = usb_packet[0] if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES: message_length = usb_packet[1] - message_content = usb_packet[2:2+message_length] + message_content = usb_packet[2 : 2 + message_length] else: message_content = usb_packet[1:] @@ -195,15 +209,13 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) return self._read_response(encrypted=encrypted) if _is_unknown_message_error(message): - raise exceptions.CommandError('Invalid command') + raise exceptions.CommandError("Invalid command") if _is_encryption_missing_error(message): - raise exceptions.CommandError( - 'Device encryption not initialized.') + raise exceptions.CommandError("Device encryption not initialized.") if _is_encryption_setup_error(message): - raise exceptions.CommandError( - 'Device encryption initialization failed.') + raise exceptions.CommandError("Device encryption initialization failed.") return message @@ -213,18 +225,19 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) self._send_command(self.TEXT_CMD, command) # Reply can stretch multiple buffers - full_content = b'' + full_content = b"" while True: message_type, content = self._read_response() logging.debug( - 'Received message: type %02x content %s', - message_type, content.hex()) + "Received message: type %02x content %s", message_type, content.hex() + ) if message_type != self.TEXT_REPLY_CMD: raise exceptions.InvalidResponse( - 'Message type %02x does not match expectations: %r' % - (message_type, content)) + "Message type %02x does not match expectations: %r" + % (message_type, content) + ) full_content += content @@ -235,17 +248,17 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) if not match: raise exceptions.InvalidResponse(full_content) - message = match.group('message') - _verify_checksum(message, match.group('checksum')) + message = match.group("message") + _verify_checksum(message, match.group("checksum")) - if match.group('status') != b'OK': + if match.group("status") != b"OK": raise exceptions.InvalidResponse(message or "Command failed") # If there is anything in the response that is not ASCII-safe, this is # probably in the patient name. The Windows utility does not seem to # validate those, so just replace anything non-ASCII with the correct # unknown codepoint. - return message.decode('ascii', 'replace') + return message.decode("ascii", "replace") # Some of the commands are also shared across devices that use this HID # protocol, but not many. Only provide here those that do seep to change @@ -253,16 +266,16 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) def _get_version(self): # type: () -> Text """Return the software version of the device.""" - return self._send_text_command(b'$swver?').rstrip('\r\n') + return self._send_text_command(b"$swver?").rstrip("\r\n") def get_serial_number(self): # type: () -> Text """Returns the serial number of the device.""" - return self._send_text_command(b'$serlnum?').rstrip('\r\n') + return self._send_text_command(b"$serlnum?").rstrip("\r\n") def get_patient_name(self): # type: () -> Optional[Text] - patient_name = self._send_text_command(b'$ptname?').rstrip('\r\n') + patient_name = self._send_text_command(b"$ptname?").rstrip("\r\n") if not patient_name: return None return patient_name @@ -270,11 +283,11 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) def set_patient_name(self, name): # type: (Text) -> None try: - encoded_name = name.encode('ascii') + encoded_name = name.encode("ascii") except UnicodeDecodeError: - raise ValueError('Only ASCII-safe names are tested working') + raise ValueError("Only ASCII-safe names are tested working") - result = self._send_text_command(b'$ptname,' + encoded_name) + result = self._send_text_command(b"$ptname," + encoded_name) def get_datetime(self): # type: () -> datetime.datetime @@ -283,12 +296,12 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) This is one of the few commands that appear common to many of the FreeStyle devices that use the HID framing protocol. """ - date = self._send_text_command(b'$date?').rstrip('\r\n') - time = self._send_text_command(b'$time?').rstrip('\r\n') + date = self._send_text_command(b"$date?").rstrip("\r\n") + time = self._send_text_command(b"$time?").rstrip("\r\n") # Year is returned as an offset to 2000. - month, day, year = (int(x) for x in date.split(',')) - hour, minute = (int(x) for x in time.split(',')) + month, day, year = (int(x) for x in date.split(",")) + hour, minute = (int(x) for x in time.split(",")) # At least Precision Neo devices can have an invalid date (bad RTC?), # and report 255 for each field, which is not valid for @@ -304,10 +317,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) # The format used by the FreeStyle devices is not composable based on # standard strftime() (namely it includes no leading zeros), so we need # to build it manually. - date_cmd = '$date,{month},{day},{year}'.format( - month=date.month, day=date.day, year=(date.year-2000)) - time_cmd = '$time,{hour},{minute}'.format( - hour=date.hour, minute=date.minute) + date_cmd = "$date,{month},{day},{year}".format( + month=date.month, day=date.day, year=(date.year - 2000) + ) + time_cmd = "$time,{hour},{minute}".format(hour=date.hour, minute=date.minute) self._send_text_command(bytes(date_cmd, "ascii")) self._send_text_command(bytes(time_cmd, "ascii")) @@ -333,7 +346,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) in the record file. """ message = self._send_text_command(command) - logging.debug('Received multirecord message:\n%s', message) + logging.debug("Received multirecord message:\n%s", message) if message == "Log Empty\r\n": return iter(()) @@ -341,9 +354,9 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) if not match: raise exceptions.InvalidResponse(message) - records_str = match.group('message') - _verify_checksum(records_str, match.group('checksum')) + records_str = match.group("message") + _verify_checksum(records_str, match.group("checksum")) - logging.debug('Received multi-record string: %s', records_str) + logging.debug("Received multi-record string: %s", records_str) - return csv.reader(records_str.split('\r\n')) + return csv.reader(records_str.split("\r\n")) |