summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--glucometerutils/support/freestyle.py97
1 files changed, 80 insertions, 17 deletions
diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py
index c94a92b..54edc7c 100644
--- a/glucometerutils/support/freestyle.py
+++ b/glucometerutils/support/freestyle.py
@@ -12,7 +12,7 @@ import csv
import datetime
import logging
import re
-from typing import AnyStr, Iterator, List, Optional, Text, Tuple
+from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple
import construct
@@ -20,6 +20,38 @@ from glucometerutils import exceptions
from glucometerutils.support import hiddevice
_INIT_COMMAND = 0x01
+_INIT_RESPONSE = 0x71
+
+_KEEPALIVE_RESPONSE = 0x22
+_UNKNOWN_MESSAGE_RESPONSE = 0x30
+
+_ENCRYPTION_SETUP_COMMAND = 0x14
+_ENCRYPTION_SETUP_RESPONSE = 0x33
+
+_ALWAYS_UNENCRYPTED_MESSAGES = (
+ _INIT_COMMAND, 0x04, 0x05, 0x06, 0x0c, 0x0d,
+ _ENCRYPTION_SETUP_COMMAND, 0x15,
+ _ENCRYPTION_SETUP_RESPONSE, 0x34, 0x35,
+ _INIT_RESPONSE,
+ _KEEPALIVE_RESPONSE,
+)
+
+def _create_matcher(message_type, content):
+ # type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool]
+ def _matcher(message):
+ return (
+ message[0] == message_type and
+ (content is None or content == message[1]))
+
+ return _matcher
+
+_is_init_reply = _create_matcher(_INIT_RESPONSE, b'\x01')
+_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, b'\x05')
+_is_uknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b'\x85')
+_is_encryption_missing_error = _create_matcher(
+ _ENCRYPTION_SETUP_RESPONSE, b'\x15')
+_is_encryption_setup_error = _create_matcher(
+ _ENCRYPTION_SETUP_RESPONSE, b'\x14')
_FREESTYLE_MESSAGE = construct.Struct(
'hid_report' / construct.Const(0, construct.Byte),
@@ -29,6 +61,14 @@ _FREESTYLE_MESSAGE = construct.Struct(
construct.Prefixed(construct.Byte, construct.GreedyBytes)),
)
+_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct(
+ 'hid_report' / construct.Const(0, construct.Byte),
+ 'message_type' / construct.Byte,
+ 'command' / construct.Padded(
+ 63, # command can only be up to 62 bytes, but one is used for length.
+ construct.GreedyBytes),
+)
+
_TEXT_COMPLETION_RE = re.compile(b'CMD (?:OK|Fail!)')
_TEXT_REPLY_FORMAT = re.compile(
b'^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n'
@@ -94,28 +134,38 @@ class FreeStyleHidDevice(hiddevice.HidDevice):
def connect(self):
"""Open connection to the device, starting the knocking sequence."""
self._send_command(_INIT_COMMAND, b'')
- self._read_response() # Only expect non-error result.
+ response = self._read_response()
+ if not _is_init_reply(response):
+ raise exceptions.ConnectionFailed(
+ 'Connection error: unexpected message %02x:%s' % (
+ response[0], response[1].hex()))
def disconnect(self):
"""Disconnect the device, nothing to be done."""
pass
- def _send_command(self, message_type, command):
- # type: (int, bytes) -> None
+ def _send_command(self, message_type, command, encrypted=False):
+ # type: (int, bytes, bool) -> None
"""Send a raw command to the device.
Args:
message_type: (int) The first byte sent with the report to the device.
command: (bytes) The command to send out the device.
"""
- usb_packet = _FREESTYLE_MESSAGE.build(
+ if encrypted:
+ assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES
+ meta_construct = _FREESTYLE_ENCRYPTED_MESSAGE
+ else:
+ meta_construct = _FREESTYLE_MESSAGE
+
+ usb_packet = meta_construct.build(
{'message_type': message_type, 'command': command})
logging.debug('Sending packet: %r', usb_packet)
self._write(usb_packet)
- def _read_response(self):
- # type: () -> Tuple[int, bytes]
+ def _read_response(self, encrypted=False):
+ # type: (bool) -> Tuple[int, bytes]
"""Read the response from the device and extracts it."""
usb_packet = self._read()
@@ -123,24 +173,36 @@ class FreeStyleHidDevice(hiddevice.HidDevice):
assert usb_packet
message_type = usb_packet[0]
- message_length = usb_packet[1]
- message_content = usb_packet[2:2+message_length]
+
+ if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES:
+ message_length = usb_packet[1]
+ message_content = usb_packet[2:2+message_length]
+ else:
+ message_content = usb_packet[1:]
+
+ # hidapi module returns a list of bytes rather than a bytes object.
+ message = (message_type, bytes(message_content))
# There appears to be a stray number of 22 01 xx messages being returned
# by some devices after commands are sent. These do not appear to have
- # meaning, so ignore them and proceed to the next.
- if message_type == 0x22 and message_length == 1:
- return self._read_response()
+ # meaning, so ignore them and proceed to the next. These are always sent
+ # unencrypted, so we need to inspect them before we decide what the
+ # message content is.
+ if _is_keepalive_response(message):
+ return self._read_response(encrypted=encrypted)
- if message_type == 0x30 and message_content == b'\x85':
+ if _is_uknown_message_error(message):
raise exceptions.CommandError('Invalid command')
- if message_type == 0x33 and message_content == b'\x15':
+ if _is_encryption_missing_error(message):
+ raise exceptions.CommandError(
+ 'Device encryption not initialized.')
+
+ if _is_encryption_setup_error(message):
raise exceptions.CommandError(
'Device encryption initialization failed.')
- # hidapi module returns a list of bytes rather than a bytes object.
- return (message_type, bytes(message_content))
+ return message
def _send_text_command(self, command):
# type: (bytes) -> Text
@@ -153,7 +215,8 @@ class FreeStyleHidDevice(hiddevice.HidDevice):
message_type, content = self._read_response()
logging.debug(
- 'Received message: type %02x content %r', message_type, content)
+ 'Received message: type %02x content %s',
+ message_type, content.hex())
if message_type != self.TEXT_REPLY_CMD:
raise exceptions.InvalidResponse(