summaryrefslogtreecommitdiffstats
path: root/freestyle_hid/_session.py
diff options
context:
space:
mode:
Diffstat (limited to 'freestyle_hid/_session.py')
-rw-r--r--freestyle_hid/_session.py264
1 files changed, 264 insertions, 0 deletions
diff --git a/freestyle_hid/_session.py b/freestyle_hid/_session.py
new file mode 100644
index 0000000..7529ef3
--- /dev/null
+++ b/freestyle_hid/_session.py
@@ -0,0 +1,264 @@
+# SPDX-FileCopyrightText: © 2013 The freestyle-hid Authors
+# SPDX-License-Identifier: Apache-2.0
+
+import csv
+import logging
+import pathlib
+import re
+from typing import AnyStr, Callable, Iterator, Optional, Sequence, Tuple
+
+import construct
+
+from ._exceptions import ChecksumError, CommandError
+from ._hidwrapper import HidWrapper
+
+ABBOTT_VENDOR_ID = 0x1A61
+
+_INIT_COMMAND = 0x01
+_INIT_RESPONSE = 0x71
+
+_KEEPALIVE_RESPONSE = 0x22
+_UNKNOWN_MESSAGE_RESPONSE = 0x30
+
+_ENCRYPTION_SETUP_COMMAND = 0x14
+_ENCRYPTION_SETUP_RESPONSE = 0x33
+
+_ALWAYS_UNENCRYPTED_MESSAGES = (
+ _INIT_COMMAND,
+ 0x04,
+ 0x05,
+ 0x06,
+ 0x0C,
+ 0x0D,
+ _ENCRYPTION_SETUP_COMMAND,
+ 0x15,
+ _ENCRYPTION_SETUP_RESPONSE,
+ 0x34,
+ 0x35,
+ _INIT_RESPONSE,
+ _KEEPALIVE_RESPONSE,
+)
+
+
+def _create_matcher(
+ message_type: int, content: Optional[bytes]
+) -> Callable[[Tuple[int, bytes]], bool]:
+ def _matcher(message: Tuple[int, bytes]) -> bool:
+ return message[0] == message_type and (content is None or content == message[1])
+
+ return _matcher
+
+
+_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01")
+_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None)
+_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85")
+_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15")
+_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14")
+
+_FREESTYLE_MESSAGE = construct.Struct(
+ hid_report=construct.Const(0, construct.Byte),
+ message_type=construct.Byte,
+ command=construct.Padded(
+ 63, # command can only be up to 62 bytes, but one is used for length.
+ construct.Prefixed(construct.Byte, construct.GreedyBytes),
+ ),
+)
+
+_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct(
+ hid_report=construct.Const(0, construct.Byte),
+ message_type=construct.Byte,
+ command=construct.Padded(
+ 63, # command can only be up to 62 bytes, but one is used for length.
+ construct.GreedyBytes,
+ ),
+)
+
+_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)")
+_TEXT_REPLY_FORMAT = re.compile(
+ b"^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n"
+ b"CMD (?P<status>OK|Fail!)\r\n$",
+ re.DOTALL,
+)
+
+_MULTIRECORDS_FORMAT = re.compile(
+ "^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$", re.DOTALL
+)
+
+
+def _verify_checksum(message: AnyStr, expected_checksum_hex: AnyStr) -> None:
+ """Calculate the simple checksum of the message and compare with expected.
+
+ Args:
+ message: (str) message to calculate the checksum of.
+ expected_checksum_hex: hexadecimal string representing the checksum
+ expected to match the message.
+
+ Raises:
+ InvalidChecksum: if the message checksum calculated does not match the one
+ received.
+ """
+ expected_checksum = int(expected_checksum_hex, 16)
+ if isinstance(message, bytes):
+ all_bytes = (c for c in message)
+ else:
+ all_bytes = (ord(c) for c in message)
+
+ calculated_checksum = sum(all_bytes)
+
+ if expected_checksum != calculated_checksum:
+ raise ChecksumError(
+ f"Invalid checksum, expected {expected_checksum}, calculated {calculated_checksum}"
+ )
+
+
+class Session:
+ def __init__(
+ self,
+ product_id: Optional[int],
+ device_path: Optional[pathlib.Path],
+ text_message_type: int,
+ text_reply_message_type: int,
+ ) -> None:
+ self._handle = HidWrapper.open(device_path, ABBOTT_VENDOR_ID, product_id)
+
+ self._text_message_type = text_message_type
+ self._text_reply_message_type = text_reply_message_type
+
+ def connect(self):
+ """Open connection to the device, starting the knocking sequence."""
+ self.send_command(_INIT_COMMAND, b"")
+ response = self.read_response()
+ if not _is_init_reply(response):
+ raise ConnectionError(
+ f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}"
+ )
+
+ def send_command(self, message_type: int, command: bytes, encrypted: bool = False):
+ """Send a raw command to the device.
+
+ Args:
+ message_type: The first byte sent with the report to the device.
+ command: The command to send out the device.
+ """
+ if encrypted:
+ assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES
+ meta_construct = _FREESTYLE_ENCRYPTED_MESSAGE
+ else:
+ meta_construct = _FREESTYLE_MESSAGE
+
+ usb_packet = meta_construct.build(
+ {"message_type": message_type, "command": command}
+ )
+
+ logging.debug(f"Sending packet: {usb_packet!r}")
+ self._handle.write(usb_packet)
+
+ def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]:
+ """Read the response from the device and extracts it."""
+ usb_packet = self._handle.read()
+
+ logging.debug(f"Read packet: {usb_packet!r}")
+
+ assert usb_packet
+ message_type = usb_packet[0]
+
+ if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES:
+ message_length = usb_packet[1]
+ message_end_idx = 2 + message_length
+ message_content = usb_packet[2:message_end_idx]
+ else:
+ message_content = usb_packet[1:]
+
+ # hidapi module returns a list of bytes rather than a bytes object.
+ message = (message_type, bytes(message_content))
+
+ # There appears to be a stray number of 22 01 xx messages being returned
+ # by some devices after commands are sent. These do not appear to have
+ # meaning, so ignore them and proceed to the next. These are always sent
+ # unencrypted, so we need to inspect them before we decide what the
+ # message content is.
+ if _is_keepalive_response(message):
+ return self.read_response(encrypted=encrypted)
+
+ if _is_unknown_message_error(message):
+ raise CommandError("Invalid command")
+
+ if _is_encryption_missing_error(message):
+ raise CommandError("Device encryption not initialized.")
+
+ if _is_encryption_setup_error(message):
+ raise CommandError("Device encryption initialization failed.")
+
+ return message
+
+ def send_text_command(self, command: bytes) -> str:
+ """Send a command to the device that expects a text reply."""
+ self.send_command(self._text_message_type, command)
+
+ # Reply can stretch multiple buffers
+ full_content = b""
+ while True:
+ message_type, content = self.read_response()
+
+ logging.debug(
+ f"Received message: type {message_type:02x} content {content.hex()}"
+ )
+
+ if message_type != self._text_reply_message_type:
+ raise CommandError(
+ f"Message type {message_type:02x}: content does not match expectations: {content!r}"
+ )
+
+ full_content += content
+
+ if _TEXT_COMPLETION_RE.search(full_content):
+ break
+
+ match = _TEXT_REPLY_FORMAT.search(full_content)
+ if not match:
+ raise CommandError(repr(full_content))
+
+ message = match.group("message")
+ _verify_checksum(message, match.group("checksum"))
+
+ if match.group("status") != b"OK":
+ raise CommandError(repr(message) or "Command failed")
+
+ # If there is anything in the response that is not ASCII-safe, this is
+ # probably in the patient name. The Windows utility does not seem to
+ # validate those, so just replace anything non-ASCII with the correct
+ # unknown codepoint.
+ return message.decode("ascii", "replace")
+
+ def query_multirecord(self, command: bytes) -> Iterator[Sequence[str]]:
+ """Queries for, and returns, "multirecords" results.
+
+ Multirecords are used for querying events, readings, history and similar
+ other data out of a FreeStyle device. These are comma-separated values,
+ variable-length.
+
+ The validation includes the general HID framing parsing, as well as
+ validation of the record count, and of the embedded records checksum.
+
+ Args:
+ command: The text command to send to the device for the query.
+
+ Returns:
+ A CSV reader object that returns a record for each line in the
+ reply buffer.
+ """
+ message = self.send_text_command(command)
+ logging.debug(f"Received multirecord message:\n{message}")
+ if message == "Log Empty\r\n":
+ return iter(())
+
+ match = _MULTIRECORDS_FORMAT.search(message)
+ if not match:
+ raise CommandError(message)
+
+ records_str = match.group("message")
+ _verify_checksum(records_str, match.group("checksum"))
+
+ logging.debug(f"Received multi-record string: {records_str}")
+
+ return csv.reader(records_str.split("\r\n"))