From f745ce94f71c16927b7ddb91986d6c026c21e7ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sun, 4 Oct 2020 15:08:43 +0100 Subject: Initial import of freestyle-hid. This library is a factor-out of https://github.com/glucometers-tech/glucometerutils to only include the FreeStyle implementation, to make it easier to use outside of glucometerutils, and in particular to make it easier to build better reverse engineering tooling around it. Note that since the code was a mix of MIT and Apache-2.0 license, the overall license of the library is written down as Apache-2.0, as that would be a super-set of the requirements from MIT. --- freestyle_hid/__init__.py | 5 + freestyle_hid/_exceptions.py | 18 ++ freestyle_hid/_hidwrapper.py | 66 ++++++ freestyle_hid/_session.py | 264 +++++++++++++++++++++++ freestyle_hid/py.typed | 2 + freestyle_hid/tools/__init__.py | 3 + freestyle_hid/tools/encrypted_setup_extractor.py | 162 ++++++++++++++ freestyle_hid/tools/extract_chatter.py | 233 ++++++++++++++++++++ freestyle_hid/tools/hid_console.py | 80 +++++++ freestyle_hid/tools/py.typed | 2 + 10 files changed, 835 insertions(+) create mode 100644 freestyle_hid/__init__.py create mode 100644 freestyle_hid/_exceptions.py create mode 100644 freestyle_hid/_hidwrapper.py create mode 100644 freestyle_hid/_session.py create mode 100644 freestyle_hid/py.typed create mode 100644 freestyle_hid/tools/__init__.py create mode 100644 freestyle_hid/tools/encrypted_setup_extractor.py create mode 100755 freestyle_hid/tools/extract_chatter.py create mode 100755 freestyle_hid/tools/hid_console.py create mode 100644 freestyle_hid/tools/py.typed (limited to 'freestyle_hid') diff --git a/freestyle_hid/__init__.py b/freestyle_hid/__init__.py new file mode 100644 index 0000000..41dce11 --- /dev/null +++ b/freestyle_hid/__init__.py @@ -0,0 +1,5 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 + +from ._exceptions import * # noqa: F403,F401 +from ._session import Session # noqa: F403,F401 diff --git a/freestyle_hid/_exceptions.py b/freestyle_hid/_exceptions.py new file mode 100644 index 0000000..38a822d --- /dev/null +++ b/freestyle_hid/_exceptions.py @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 + + +class HIDError(Exception): + """Errors related to the HID access process.""" + + +class ConnectionError(Exception): + """Errors related to Session establishment.""" + + +class ChecksumError(Exception): + """Errors related to the transmission checksums.""" + + +class CommandError(Exception): + """Errors related to the command stream.""" diff --git a/freestyle_hid/_hidwrapper.py b/freestyle_hid/_hidwrapper.py new file mode 100644 index 0000000..a82bd4c --- /dev/null +++ b/freestyle_hid/_hidwrapper.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 +"""HID wrappers to access files with either hidraw or cython-hidapi.""" + +import abc +import pathlib +from typing import BinaryIO, Optional, Union + +try: + import hid +except ImportError: + hid = None + +from ._exceptions import HIDError + + +class HidWrapper(abc.ABC): + + _handle: Union[BinaryIO, "hid.device"] + + def write(self, report: bytes) -> None: + if len(report) > 65: + raise HIDError(f"Invalid report length {len(report)}.") + + written = self._handle.write(report) + if written < 0: + raise HIDError(f"Invalid write ({written}).") + + @abc.abstractmethod + def read(self, size: int = 64) -> bytes: + pass + + @staticmethod + def open( + device_path: Optional[pathlib.Path], vendor_id: int, product_id: Optional[int] + ) -> "HidWrapper": + if device_path: + return HidRaw(device_path) + else: + assert product_id is not None + return HidApi(vendor_id, product_id) + + +class HidRaw(HidWrapper): + def __init__(self, device_path: pathlib.Path) -> None: + if not device_path.exists(): + raise ValueError(f"Path {device_path} does not exists.") + + self._handle = device_path.open("w+b") + + def read(self, size: int = 64) -> bytes: + return self._handle.read(size) + + +class HidApi(HidWrapper): + _handle: "hid.device" + + def __init__(self, vendor_id: int, product_id: int) -> None: + if hid is None: + raise ValueError("cython-hidapi not found.") + + self._handle = hid.device() + self._handle.open(vendor_id, product_id) + + def read(self, size: int = 64) -> bytes: + return bytes(self._handle.read(size, timeout_ms=0)) diff --git a/freestyle_hid/_session.py b/freestyle_hid/_session.py new file mode 100644 index 0000000..7529ef3 --- /dev/null +++ b/freestyle_hid/_session.py @@ -0,0 +1,264 @@ +# SPDX-FileCopyrightText: © 2013 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 + +import csv +import logging +import pathlib +import re +from typing import AnyStr, Callable, Iterator, Optional, Sequence, Tuple + +import construct + +from ._exceptions import ChecksumError, CommandError +from ._hidwrapper import HidWrapper + +ABBOTT_VENDOR_ID = 0x1A61 + +_INIT_COMMAND = 0x01 +_INIT_RESPONSE = 0x71 + +_KEEPALIVE_RESPONSE = 0x22 +_UNKNOWN_MESSAGE_RESPONSE = 0x30 + +_ENCRYPTION_SETUP_COMMAND = 0x14 +_ENCRYPTION_SETUP_RESPONSE = 0x33 + +_ALWAYS_UNENCRYPTED_MESSAGES = ( + _INIT_COMMAND, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + _ENCRYPTION_SETUP_COMMAND, + 0x15, + _ENCRYPTION_SETUP_RESPONSE, + 0x34, + 0x35, + _INIT_RESPONSE, + _KEEPALIVE_RESPONSE, +) + + +def _create_matcher( + message_type: int, content: Optional[bytes] +) -> Callable[[Tuple[int, bytes]], bool]: + def _matcher(message: Tuple[int, bytes]) -> bool: + return message[0] == message_type and (content is None or content == message[1]) + + return _matcher + + +_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01") +_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None) +_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85") +_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15") +_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14") + +_FREESTYLE_MESSAGE = construct.Struct( + hid_report=construct.Const(0, construct.Byte), + message_type=construct.Byte, + command=construct.Padded( + 63, # command can only be up to 62 bytes, but one is used for length. + construct.Prefixed(construct.Byte, construct.GreedyBytes), + ), +) + +_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct( + hid_report=construct.Const(0, construct.Byte), + message_type=construct.Byte, + command=construct.Padded( + 63, # command can only be up to 62 bytes, but one is used for length. + construct.GreedyBytes, + ), +) + +_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)") +_TEXT_REPLY_FORMAT = re.compile( + b"^(?P.*)CKSM:(?P[0-9A-F]{8})\r\n" + b"CMD (?POK|Fail!)\r\n$", + re.DOTALL, +) + +_MULTIRECORDS_FORMAT = re.compile( + "^(?P.+\r\n)(?P[0-9]+),(?P[0-9A-F]{8})\r\n$", re.DOTALL +) + + +def _verify_checksum(message: AnyStr, expected_checksum_hex: AnyStr) -> None: + """Calculate the simple checksum of the message and compare with expected. + + Args: + message: (str) message to calculate the checksum of. + expected_checksum_hex: hexadecimal string representing the checksum + expected to match the message. + + Raises: + InvalidChecksum: if the message checksum calculated does not match the one + received. + """ + expected_checksum = int(expected_checksum_hex, 16) + if isinstance(message, bytes): + all_bytes = (c for c in message) + else: + all_bytes = (ord(c) for c in message) + + calculated_checksum = sum(all_bytes) + + if expected_checksum != calculated_checksum: + raise ChecksumError( + f"Invalid checksum, expected {expected_checksum}, calculated {calculated_checksum}" + ) + + +class Session: + def __init__( + self, + product_id: Optional[int], + device_path: Optional[pathlib.Path], + text_message_type: int, + text_reply_message_type: int, + ) -> None: + self._handle = HidWrapper.open(device_path, ABBOTT_VENDOR_ID, product_id) + + self._text_message_type = text_message_type + self._text_reply_message_type = text_reply_message_type + + def connect(self): + """Open connection to the device, starting the knocking sequence.""" + self.send_command(_INIT_COMMAND, b"") + response = self.read_response() + if not _is_init_reply(response): + raise ConnectionError( + f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}" + ) + + def send_command(self, message_type: int, command: bytes, encrypted: bool = False): + """Send a raw command to the device. + + Args: + message_type: The first byte sent with the report to the device. + command: The command to send out the device. + """ + if encrypted: + assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES + meta_construct = _FREESTYLE_ENCRYPTED_MESSAGE + else: + meta_construct = _FREESTYLE_MESSAGE + + usb_packet = meta_construct.build( + {"message_type": message_type, "command": command} + ) + + logging.debug(f"Sending packet: {usb_packet!r}") + self._handle.write(usb_packet) + + def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]: + """Read the response from the device and extracts it.""" + usb_packet = self._handle.read() + + logging.debug(f"Read packet: {usb_packet!r}") + + assert usb_packet + message_type = usb_packet[0] + + if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES: + message_length = usb_packet[1] + message_end_idx = 2 + message_length + message_content = usb_packet[2:message_end_idx] + else: + message_content = usb_packet[1:] + + # hidapi module returns a list of bytes rather than a bytes object. + message = (message_type, bytes(message_content)) + + # There appears to be a stray number of 22 01 xx messages being returned + # by some devices after commands are sent. These do not appear to have + # meaning, so ignore them and proceed to the next. These are always sent + # unencrypted, so we need to inspect them before we decide what the + # message content is. + if _is_keepalive_response(message): + return self.read_response(encrypted=encrypted) + + if _is_unknown_message_error(message): + raise CommandError("Invalid command") + + if _is_encryption_missing_error(message): + raise CommandError("Device encryption not initialized.") + + if _is_encryption_setup_error(message): + raise CommandError("Device encryption initialization failed.") + + return message + + def send_text_command(self, command: bytes) -> str: + """Send a command to the device that expects a text reply.""" + self.send_command(self._text_message_type, command) + + # Reply can stretch multiple buffers + full_content = b"" + while True: + message_type, content = self.read_response() + + logging.debug( + f"Received message: type {message_type:02x} content {content.hex()}" + ) + + if message_type != self._text_reply_message_type: + raise CommandError( + f"Message type {message_type:02x}: content does not match expectations: {content!r}" + ) + + full_content += content + + if _TEXT_COMPLETION_RE.search(full_content): + break + + match = _TEXT_REPLY_FORMAT.search(full_content) + if not match: + raise CommandError(repr(full_content)) + + message = match.group("message") + _verify_checksum(message, match.group("checksum")) + + if match.group("status") != b"OK": + raise CommandError(repr(message) or "Command failed") + + # If there is anything in the response that is not ASCII-safe, this is + # probably in the patient name. The Windows utility does not seem to + # validate those, so just replace anything non-ASCII with the correct + # unknown codepoint. + return message.decode("ascii", "replace") + + def query_multirecord(self, command: bytes) -> Iterator[Sequence[str]]: + """Queries for, and returns, "multirecords" results. + + Multirecords are used for querying events, readings, history and similar + other data out of a FreeStyle device. These are comma-separated values, + variable-length. + + The validation includes the general HID framing parsing, as well as + validation of the record count, and of the embedded records checksum. + + Args: + command: The text command to send to the device for the query. + + Returns: + A CSV reader object that returns a record for each line in the + reply buffer. + """ + message = self.send_text_command(command) + logging.debug(f"Received multirecord message:\n{message}") + if message == "Log Empty\r\n": + return iter(()) + + match = _MULTIRECORDS_FORMAT.search(message) + if not match: + raise CommandError(message) + + records_str = match.group("message") + _verify_checksum(records_str, match.group("checksum")) + + logging.debug(f"Received multi-record string: {records_str}") + + return csv.reader(records_str.split("\r\n")) diff --git a/freestyle_hid/py.typed b/freestyle_hid/py.typed new file mode 100644 index 0000000..311e481 --- /dev/null +++ b/freestyle_hid/py.typed @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 diff --git a/freestyle_hid/tools/__init__.py b/freestyle_hid/tools/__init__.py new file mode 100644 index 0000000..3e0558f --- /dev/null +++ b/freestyle_hid/tools/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2013 The freestyle-hid Authors +# +# SPDX-License-Identifier: 0BSD diff --git a/freestyle_hid/tools/encrypted_setup_extractor.py b/freestyle_hid/tools/encrypted_setup_extractor.py new file mode 100644 index 0000000..dfe8229 --- /dev/null +++ b/freestyle_hid/tools/encrypted_setup_extractor.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: © 2019 The usbmon-tools Authors +# SPDX-FileCopyrightText: © 2019 The freestyle-hid Authors +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import sys +from typing import BinaryIO, Sequence + +import click +import click_log +import construct +import usbmon +import usbmon.pcapng + +logger = logging.getLogger() +click_log.basic_config(logger) + + +_SERIAL_NUMBER_RESPONSE_TYPE = 0x06 +_ENCRYPTION_SETUP_REQ_TYPE = 0x14 +_ENCRYPTION_SETUP_RESP_TYPE = 0x33 + + +_START_AUTHORIZE_CMD = 0x11 +_CHALLENGE_CMD = 0x16 +_CHALLENGE_RESPONSE_CMD = 0x17 + + +_ABBOTT_VENDOR_ID = 0x1A61 +_LIBRE2_PRODUCT_ID = 0x3950 + +_SERIAL_NO = construct.Struct( + message_type=construct.Const(_SERIAL_NUMBER_RESPONSE_TYPE, construct.Byte), + length=construct.Const(14, construct.Byte), + serial_number=construct.PaddedString(13, "ascii"), + termination=construct.Const(0, construct.Byte), +) + +_CHALLENGE = construct.Struct( + message_type=construct.Const(_ENCRYPTION_SETUP_RESP_TYPE, construct.Byte), + length=construct.Const(16, construct.Byte), + subcmd=construct.Const(_CHALLENGE_CMD, construct.Byte), + challenge=construct.Bytes(8), + iv=construct.Bytes(7), +) + +_CHALLENGE_RESPONSE = construct.Struct( + message_type=construct.Const(_ENCRYPTION_SETUP_REQ_TYPE, construct.Byte), + length=construct.Const(26, construct.Byte), + subcmd=construct.Const(_CHALLENGE_RESPONSE_CMD, construct.Byte), + challenge_response_encrypted=construct.Bytes(16), + const=construct.Const(1, construct.Byte), + mac=construct.Bytes(8), +) + + +@click.command() +@click_log.simple_verbosity_option(logger, "--vlog") +@click.option( + "--device-address", + help=( + "Device address (busnum.devnum) of the device to extract capture" + " of. If none provided, device descriptors will be relied on." + ), +) +@click.argument( + "pcap-files", + type=click.File(mode="rb"), + nargs=None, +) +def main(*, device_address: str, pcap_files: Sequence[BinaryIO]): + if sys.version_info < (3, 7): + raise Exception("Unsupported Python version, please use at least Python 3.7.") + + for pcap_file in pcap_files: + session = usbmon.pcapng.parse_stream(pcap_file, retag_urbs=False) + + if not device_address: + for descriptor in session.device_descriptors.values(): + if ( + descriptor.vendor_id == _ABBOTT_VENDOR_ID + and descriptor.product_id == _LIBRE2_PRODUCT_ID + ): + if device_address and device_address != descriptor.address: + raise Exception( + "Multiple Libre2 devices present in capture, please" + " provide a --device-address flag." + ) + device_address = descriptor.address + else: + device_address = descriptor.address + + if device_address in session.device_descriptors: + descriptor = session.device_descriptors[device_address] + assert descriptor.vendor_id == _ABBOTT_VENDOR_ID + assert descriptor.product_id == _LIBRE2_PRODUCT_ID + + serial_number = "UNKNOWN" + challenge = "UNKNOWN" + iv = "UNKNOWN" + encrypted_challenge = "UNKNOWN" + mac = "UNKNOWN" + + for first, second in session.in_pairs(): + # Ignore stray callbacks/errors. + if not first.type == usbmon.constants.PacketType.SUBMISSION: + continue + + if not first.address.startswith(f"{device_address}."): + # No need to check second, they will be linked. + continue + + if first.xfer_type == usbmon.constants.XferType.INTERRUPT: + pass + elif ( + first.xfer_type == usbmon.constants.XferType.CONTROL + and not first.setup_packet + or first.setup_packet.type == usbmon.setup.Type.CLASS # type: ignore + ): + pass + else: + continue + + if first.direction == usbmon.constants.Direction.OUT: + packet = first + else: + assert second is not None + packet = second + + if not packet.payload: + continue + + assert len(packet.payload) >= 2 + + message_type = packet.payload[0] + + if message_type == _SERIAL_NUMBER_RESPONSE_TYPE: + obj = _SERIAL_NO.parse(packet.payload) + serial_number = obj.serial_number + elif ( + message_type == _ENCRYPTION_SETUP_RESP_TYPE + and packet.payload[2] == _CHALLENGE_CMD + ): + obj = _CHALLENGE.parse(packet.payload) + challenge = obj.challenge.hex() + iv = obj.iv.hex() + elif ( + message_type == _ENCRYPTION_SETUP_REQ_TYPE + and packet.payload[2] == _CHALLENGE_RESPONSE_CMD + ): + obj = _CHALLENGE_RESPONSE.parse(packet.payload) + encrypted_challenge = obj.challenge_response_encrypted.hex() + mac = obj.mac.hex() + + print(f"{serial_number},{challenge},{iv},{encrypted_challenge},{mac}") + + +if __name__ == "__main__": + main() diff --git a/freestyle_hid/tools/extract_chatter.py b/freestyle_hid/tools/extract_chatter.py new file mode 100755 index 0000000..a77a0ec --- /dev/null +++ b/freestyle_hid/tools/extract_chatter.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: © 2019 The usbmon-tools Authors +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import sys +import textwrap +from typing import BinaryIO + +import click +import click_log +import construct +import usbmon +import usbmon.chatter +import usbmon.pcapng + +logger = logging.getLogger() +click_log.basic_config(logger) + +_KEEPALIVE_TYPE = 0x22 + +_UNENCRYPTED_TYPES = ( + 0x01, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + 0x14, + 0x15, + 0x33, + 0x34, + 0x35, + 0x71, + _KEEPALIVE_TYPE, +) + +_ENCRYPTION_SETUP_TYPES = (0x14, 0x33) + +_START_AUTHORIZE_CMD = 0x11 +_CHALLENGE_CMD = 0x16 +_CHALLENGE_RESPONSE_CMD = 0x17 +_CHALLENGE_ACCEPTED_CMD = 0x18 + +_ABBOTT_VENDOR_ID = 0x1A61 +_LIBRE2_PRODUCT_ID = 0x3950 + +_ENCRYPTED_MESSAGE = construct.Struct( + message_type=construct.Byte, + encrypted_message=construct.Bytes(64 - 1 - 4 - 4), + sequence_number=construct.Int32ul, + mac=construct.Int32ul, +) + + +@click.command() +@click_log.simple_verbosity_option(logger, "--vlog") +@click.option( + "--device-address", + help=( + "Device address (busnum.devnum) of the device to extract capture" + " of. If none provided, device descriptors will be relied on." + ), +) +@click.option( + "--encrypted-protocol / --no-encrypted-protocol", + default=False, + help=( + "Whether to expect encrypted protocol in the capture." + " Ignored if the device descriptors are present in the capture." + ), +) +@click.option( + "--verbose-encryption-setup / --no-verbose-encryption-setup", + default=False, + help=( + "Whether to parse encryption setup commands and printing their component" + " together with the raw messsage." + ), +) +@click.option( + "--print-keepalive / --no-print-keepalive", + default=False, + help=( + "Whether to print the keepalive messages sent by the device. " + "Keepalive messages are usually safely ignored." + ), +) +@click.argument( + "pcap-file", + type=click.File(mode="rb"), +) +def main( + *, + device_address: str, + encrypted_protocol: bool, + verbose_encryption_setup: bool, + print_keepalive: bool, + pcap_file: BinaryIO, +) -> None: + if sys.version_info < (3, 7): + raise Exception("Unsupported Python version, please use at least Python 3.7.") + + session = usbmon.pcapng.parse_stream(pcap_file, retag_urbs=False) + + if not device_address: + for descriptor in session.device_descriptors.values(): + if descriptor.vendor_id == _ABBOTT_VENDOR_ID: + if device_address and device_address != descriptor.address: + raise Exception( + "Multiple Abbott device present in capture, please" + " provide a --device-address flag." + ) + device_address = descriptor.address + + if device_address not in session.device_descriptors: + logging.warning( + f"Unable to find device {device_address} in the capture's descriptors." + " Assuming non-encrypted protocol.", + ) + else: + descriptor = session.device_descriptors[device_address] + assert descriptor.vendor_id == _ABBOTT_VENDOR_ID + + if descriptor.product_id == _LIBRE2_PRODUCT_ID: + encrypted_protocol = True + + for first, second in session.in_pairs(): + # Ignore stray callbacks/errors. + if not first.type == usbmon.constants.PacketType.SUBMISSION: + continue + + if not first.address.startswith(f"{device_address}."): + # No need to check second, they will be linked. + continue + + if first.xfer_type == usbmon.constants.XferType.INTERRUPT: + pass + elif ( + first.xfer_type == usbmon.constants.XferType.CONTROL + and not first.setup_packet + or first.setup_packet.type == usbmon.setup.Type.CLASS # type: ignore + ): + pass + else: + continue + + if first.direction == usbmon.constants.Direction.OUT: + packet = first + else: + assert second is not None + packet = second + + if not packet.payload: + continue + + assert len(packet.payload) >= 2 + + message_type = packet.payload[0] + + if message_type == _KEEPALIVE_TYPE and not print_keepalive: + continue + + message_metadata = [] + + if encrypted_protocol and message_type not in _UNENCRYPTED_TYPES: + # With encrypted communication, the length of the message is also encrypted, + # and all the packets use the full 64 bytes. So instead, we extract what + # metadata we can. + parsed = _ENCRYPTED_MESSAGE.parse(packet.payload) + message_metadata.extend( + [f"SEQUENCE_NUMBER={parsed.sequence_number}", f"MAC={parsed.mac:04x}"] + ) + + message_type_str = f"x{message_type:02x}" + message = parsed.encrypted_message + elif verbose_encryption_setup and message_type in _ENCRYPTION_SETUP_TYPES: + message_length = packet.payload[1] + message_end_idx = 2 + message_length + message = packet.payload[2:message_end_idx] + + if message[0] == _START_AUTHORIZE_CMD: + message_metadata.append("START_AUTHORIZE") + elif message[0] == _CHALLENGE_CMD: + message_metadata.append("CHALLENGE") + challenge = message[1:9] + iv = message[9:16] + message_metadata.append(f"CHALLENGE={challenge.hex()}") + message_metadata.append(f"IV={iv.hex()}") + elif message[0] == _CHALLENGE_RESPONSE_CMD: + message_metadata.append("CHALLENGE_RESPONSE") + encrypted_challenge = message[1:17] + challenge_mac = message[18:26] + message_metadata.append( + f"ENCRYPTED_CHALLENGE={encrypted_challenge.hex()}" + ) + message_metadata.append(f"MAC={challenge_mac.hex()}") + elif message[0] == _CHALLENGE_ACCEPTED_CMD: + message_metadata.append("CHALLENGE_ACCEPTED") + + message_metadata.append(f"RAW_LENGTH={message_length}") + message_type_str = f" {message_type:02x}" + else: + message_length = packet.payload[1] + message_metadata.append(f"LENGTH={message_length}") + message_end_idx = 2 + message_length + message_type_str = f" {message_type:02x}" + message = packet.payload[2:message_end_idx] + + if message_metadata: + metadata_string = "\n".join( + textwrap.wrap( + " ".join(message_metadata), width=80, break_long_words=False + ) + ) + print(metadata_string) + + print( + usbmon.chatter.dump_bytes( + packet.direction, + message, + prefix=f"[{message_type_str}]", + print_empty=True, + ), + "\n", + ) + + +if __name__ == "__main__": + main() diff --git a/freestyle_hid/tools/hid_console.py b/freestyle_hid/tools/hid_console.py new file mode 100755 index 0000000..b3b3fee --- /dev/null +++ b/freestyle_hid/tools/hid_console.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: © 2019 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 +"""CLI tool to send messages through FreeStyle HID protocol.""" + +import logging +import pathlib +import sys +from typing import Optional + +import click +import click_log + +import freestyle_hid + +logger = logging.getLogger() +click_log.basic_config(logger) + + +@click.command() +@click_log.simple_verbosity_option(logger, "--vlog") +@click.option( + "--text-command-type", + "-c", + type=int, + default=0x60, + help="Message type for text commands sent to the device.", +) +@click.option( + "--text-reply-type", + "-r", + type=int, + default=0x60, + help="Message type for text replies received from the device.", +) +@click.option( + "--product-id", + "-p", + type=int, + help="Optional product ID (in alternative to the device path)", +) +@click.argument( + "device-path", + type=click.Path(exists=True, dir_okay=False, writable=True, allow_dash=False), + callback=lambda ctx, param, value: pathlib.Path(value) if value else None, + required=False, +) +def main( + *, + text_command_type: int, + text_reply_type: int, + product_id: Optional[int], + device_path: Optional[pathlib.Path], +): + if not product_id and not device_path: + raise click.UsageError( + "One of --product-id or DEVICE_PATH need to be provided." + ) + + session = freestyle_hid.Session( + product_id, device_path, text_command_type, text_reply_type + ) + + session.connect() + + while True: + if sys.stdin.isatty(): + command = input(">>> ") + else: + command = input() + print(f">>> {command}") + + try: + print(session.send_text_command(bytes(command, "ascii"))) + except freestyle_hid.CommandError as error: + print(f"! {error!r}") + + +if __name__ == "__main__": + main() diff --git a/freestyle_hid/tools/py.typed b/freestyle_hid/tools/py.typed new file mode 100644 index 0000000..311e481 --- /dev/null +++ b/freestyle_hid/tools/py.typed @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 -- cgit v1.2.3