From d273113e7fef07fd95f50e63dae76e140e80d8f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sun, 16 May 2021 15:08:40 +0100 Subject: Use the new APIs from usbmon-tools to simplify chatter extraction. The HID support module, together with the search by IDs make it easier to write a reliable extractor. --- freestyle_hid/tools/extract_chatter.py | 68 ++++++++++------------------------ 1 file changed, 20 insertions(+), 48 deletions(-) (limited to 'freestyle_hid') diff --git a/freestyle_hid/tools/extract_chatter.py b/freestyle_hid/tools/extract_chatter.py index a77a0ec..b2c16f6 100755 --- a/freestyle_hid/tools/extract_chatter.py +++ b/freestyle_hid/tools/extract_chatter.py @@ -16,6 +16,7 @@ import construct import usbmon import usbmon.chatter import usbmon.pcapng +import usbmon.support.hid logger = logging.getLogger() click_log.basic_config(logger) @@ -49,7 +50,6 @@ _ABBOTT_VENDOR_ID = 0x1A61 _LIBRE2_PRODUCT_ID = 0x3950 _ENCRYPTED_MESSAGE = construct.Struct( - message_type=construct.Byte, encrypted_message=construct.Bytes(64 - 1 - 4 - 4), sequence_number=construct.Int32ul, mac=construct.Int32ul, @@ -107,14 +107,15 @@ def main( session = usbmon.pcapng.parse_stream(pcap_file, retag_urbs=False) if not device_address: - for descriptor in session.device_descriptors.values(): - if descriptor.vendor_id == _ABBOTT_VENDOR_ID: - if device_address and device_address != descriptor.address: - raise Exception( - "Multiple Abbott device present in capture, please" - " provide a --device-address flag." - ) - device_address = descriptor.address + possible_addresses = list(session.find_devices_by_ids(_ABBOTT_VENDOR_ID, None)) + if len(possible_addresses) > 1: + raise click.UsageError( + f"Multiple Abbott devices found, please select one of {','.join(possible_addresses)}" + ) + elif len(possible_addresses) == 0: + raise click.UsageError("No Abbott devices found.") + else: + (device_address,) = possible_addresses if device_address not in session.device_descriptors: logging.warning( @@ -128,38 +129,9 @@ def main( if descriptor.product_id == _LIBRE2_PRODUCT_ID: encrypted_protocol = True - for first, second in session.in_pairs(): - # Ignore stray callbacks/errors. - if not first.type == usbmon.constants.PacketType.SUBMISSION: - continue - - if not first.address.startswith(f"{device_address}."): - # No need to check second, they will be linked. - continue - - if first.xfer_type == usbmon.constants.XferType.INTERRUPT: - pass - elif ( - first.xfer_type == usbmon.constants.XferType.CONTROL - and not first.setup_packet - or first.setup_packet.type == usbmon.setup.Type.CLASS # type: ignore - ): - pass - else: - continue - - if first.direction == usbmon.constants.Direction.OUT: - packet = first - else: - assert second is not None - packet = second - - if not packet.payload: - continue - - assert len(packet.payload) >= 2 - - message_type = packet.payload[0] + for packet in usbmon.support.hid.select(session, device_address=device_address): + assert packet.report_content + message_type = packet.report_id if message_type == _KEEPALIVE_TYPE and not print_keepalive: continue @@ -170,7 +142,7 @@ def main( # With encrypted communication, the length of the message is also encrypted, # and all the packets use the full 64 bytes. So instead, we extract what # metadata we can. - parsed = _ENCRYPTED_MESSAGE.parse(packet.payload) + parsed = _ENCRYPTED_MESSAGE.parse(packet.report_content) message_metadata.extend( [f"SEQUENCE_NUMBER={parsed.sequence_number}", f"MAC={parsed.mac:04x}"] ) @@ -178,9 +150,9 @@ def main( message_type_str = f"x{message_type:02x}" message = parsed.encrypted_message elif verbose_encryption_setup and message_type in _ENCRYPTION_SETUP_TYPES: - message_length = packet.payload[1] - message_end_idx = 2 + message_length - message = packet.payload[2:message_end_idx] + message_length = packet.report_content[0] + message_end_idx = 1 + message_length + message = packet.report_content[1:message_end_idx] if message[0] == _START_AUTHORIZE_CMD: message_metadata.append("START_AUTHORIZE") @@ -204,11 +176,11 @@ def main( message_metadata.append(f"RAW_LENGTH={message_length}") message_type_str = f" {message_type:02x}" else: - message_length = packet.payload[1] + message_length = packet.report_content[0] message_metadata.append(f"LENGTH={message_length}") - message_end_idx = 2 + message_length + message_end_idx = 1 + message_length message_type_str = f" {message_type:02x}" - message = packet.payload[2:message_end_idx] + message = packet.report_content[1:message_end_idx] if message_metadata: metadata_string = "\n".join( -- cgit v1.2.3