From 260a3935d0124097c6d54addd7ccd103dc890623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Mon, 16 Jan 2017 22:13:59 +0000 Subject: Add new driver for the Abbott FreeStyle Precision Neo. This driver uses the HID-based protocol which is shared among different devices. --- README | 25 ++-- glucometerutils/drivers/fsprecisionneo.py | 65 ++++++++++ glucometerutils/support/freestyle.py | 192 ++++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+), 11 deletions(-) create mode 100644 glucometerutils/drivers/fsprecisionneo.py create mode 100644 glucometerutils/support/freestyle.py diff --git a/README b/README index 4d311fb..d5fe406 100644 --- a/README +++ b/README @@ -21,16 +21,17 @@ Please see the following table for the driver for each device that is known and supported; the following table will provide further information on each of the devices. -| Manufacturer | Model Name | Driver | -| --- | --- | --- | -| LifeScan | OneTouch Ultra 2 | `otultra2` | -| LifeScan | OneTouch Ultra Easy | `otultraeasy` | -| LifeScan | OneTouch Ultra Mini | `otultraeasy` | -| LifeScan | OneTouch Verio (USB) | `otverio2015` | -| LifeScan | OneTouch Select Plus | `otverio2015` | -| Abbott | FreeStyle Optium | `fsoptium` | -| Roche | Accu-Chek Mobile | `accuchek_reports` | -| SD Biosensor | SD CodeFree | `sdcodefree` | +| Manufacturer | Model Name | Driver | +| --- | --- | --- | +| LifeScan | OneTouch Ultra 2 | `otultra2` | +| LifeScan | OneTouch Ultra Easy | `otultraeasy` | +| LifeScan | OneTouch Ultra Mini | `otultraeasy` | +| LifeScan | OneTouch Verio (USB) | `otverio2015` | +| LifeScan | OneTouch Select Plus | `otverio2015` | +| Abbott | FreeStyle Optium | `fsoptium` | +| Abbott | FreeStyle Precision Neo | `fsprecisionneo` | +| Roche | Accu-Chek Mobile | `accuchek_reports` | +| SD Biosensor | SD CodeFree | `sdcodefree` | ### Driver features @@ -40,6 +41,7 @@ information on each of the devices. | `otultraeasy` | serialno, swver, unit | get and set | not supported by device | yes | | `otverio2015` | serialno, swver | get and set | no | yes | | `fsoptium` | serialno, swver, unit | get and set | not supported by device | not supported by device | +| `fsprecisionneo` | serialno, swver | get | not supported by device | not supported by device | | `accuchek_reports` | serialno, unit | no | yes | not supported by device | | `sdcodefree` | none | set only | pre-/post-meal notes | not supported by device | @@ -51,6 +53,7 @@ information on each of the devices. | `otultraeasy` | [pyserial] | | `otverio2015` | [python-scsi] | | `fsoptium` | [pyserial] | +| `fsprecisionneo` | | | `sdcodefree` | [pyserial] | [pyserial]: https://pythonhosted.org/pyserial/ @@ -89,7 +92,7 @@ compatible. License ------- -Copyright © 2013-2016 Diego Elio Pettenò +Copyright © 2013-2017 Diego Elio Pettenò Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py new file mode 100644 index 0000000..9e9cea1 --- /dev/null +++ b/glucometerutils/drivers/fsprecisionneo.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +"""Driver for FreeStyle Precision Neo devices. +""" + +import collections +import datetime + +from glucometerutils import common +from glucometerutils.support import freestyle + + +# The type is a string because it precedes the parsing of the object. +_TYPE_GLUCOSE_READING = '7' + +_NeoReading = collections.namedtuple('_NeoReading', ( + 'type', # 7 = blood glucose + 'id', + 'month', 'day', 'year', # year is two-digits + 'hour', 'minute', + 'unknown2', + 'value', + 'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7', + 'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12', +)) + + +class Device(freestyle.FreeStyleHidDevice): + """Glucometer driver for FreeStyle Precision Neo devices.""" + + def get_meter_info(self): + """Return the device information in structured form.""" + return common.MeterInfo( + 'FreeStyle Precision Neo', + serial_number=self.get_serial_number(), + version_info=( + 'Software version: ' + self._get_version(),), + native_unit=self.get_glucose_unit()) + + def _get_version(self): + """Return the software version of the device.""" + return self._send_text_command(b'$swver?').rstrip('\r\n') + + def get_serial_number(self): + """Returns the serial number of the device.""" + return self._send_text_command(b'$serlnum?').rstrip('\r\n') + + def get_glucose_unit(self): + """Returns the glucose unit of the device.""" + return common.UNIT_MGDL + + def get_readings(self): + """Iterate through the reading records in the device.""" + for record in self._get_multirecord(b'$record?'): + if not record or record[0] != _TYPE_GLUCOSE_READING: + continue + + # Build a _NeoReading object by parsing each of the entries in the + # CSV as integers. + raw_reading = _NeoReading._make([int(v) for v in record]) + + timestamp = datetime.datetime( + raw_reading.year + 2000, raw_reading.month, raw_reading.day, + raw_reading.hour, raw_reading.minute) + + yield common.Reading(timestamp, raw_reading.value) diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py new file mode 100644 index 0000000..bef695e --- /dev/null +++ b/glucometerutils/support/freestyle.py @@ -0,0 +1,192 @@ +"""Common routines to implement the FreeStyle common protocol. + +Protocol documentation available at +https://flameeyes.github.io/glucometer-protocols/abbott/shared-hid-protocol.html + +""" + +import csv +import datetime +import re +import struct + +from glucometerutils import exceptions + +# Sequence of initialization messages sent to the device to establish HID +# protocol. +_INIT_SEQUENCE = (0x04, 0x05, 0x15, 0x01) + +_STRUCT_PREAMBLE = struct.Struct('.+\r\n)CKSM:(?P[0-9A-F]{8})\r\n' + 'CMD OK\r\n$', re.DOTALL) + +_MULTIRECORDS_FORMAT = re.compile( + '^(?P.+\r\n)(?P[0-9]+),(?P[0-9A-F]{8})\r\n$', + re.DOTALL) + + +def _verify_checksum(message, expected_checksum_hex): + """Calculate the simple checksum of the message and compare with expected. + + Args: + message: (str) message to calculate the checksum of. + expected_checksum_hex: (str) hexadecimal string representing the checksum + expected to match the message. + + Raises: + InvalidChecksum: if the message checksum calculated does not match the one + received. + """ + expected_checksum = int(expected_checksum_hex, 16) + calculated_checksum = sum(ord(c) for c in message) + + if expected_checksum != calculated_checksum: + raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum) + + +class FreeStyleHidDevice(object): + """Base class implementing the FreeStyle HID common protocol. + + This class implements opening, initializing the connection and sending + commands to the device, reading the response and confirming the checksums. + + Commands sent to the devices over this protocol have a "message type" + prefixed to the command itself. Text command are usually sent with message + type 0x60, and the replied received with the same. Some devices may diverge + though. + """ + + TEXT_CMD = 0x60 + TEXT_REPLY_CMD = 0x60 + + def __init__(self, device): + self.handle_ = open(device, 'w+b') + + def connect(self): + """Open connection to the device, starting the knocking sequence.""" + for message in _INIT_SEQUENCE: + self._send_command(message, b'') + # Ignore the returned values, they are not generally useful. The + # Serial Number as returned may not actually match the device's + # serial number (e.g. in the FreeStyle Precision Neo). + self._read_response() + + def disconnect(self): + """Disconnect the device, nothing to be done.""" + pass + + def _send_command(self, message_type, command): + """Send a raw command to the device. + + Args: + message_type: (int) The first byte sent with the report to the device. + command: (bytes) The command to send out the device. + """ + cmdlen = len(command) + assert cmdlen <= 62 + + # First byte in the written buffer is the report number, on Linux HID + # interface. + usb_packet = b'\x00' + _STRUCT_PREAMBLE.pack( + message_type, cmdlen) + command + bytes(62 - cmdlen) + + if self.handle_.write(usb_packet) < 0: + raise exceptions.InvalidResponse() + + def _read_response(self): + """Read the response from the device and extracts it.""" + usb_packet = self.handle_.read(64) + + assert usb_packet + message_type = usb_packet[0] + message_length = usb_packet[1] + message_content = usb_packet[2:2+message_length] + + # There appears to be a stray number of 22 01 03 messages being returned + # by some devices after commands are sent. These do not appear to have + # meaning, so ignore them and proceed to the next. + if message_type == 0x22 and message_length == 1: + return self._read_response() + + return (message_type, message_content) + + def _send_text_command(self, command): + """Send a command to the device that expects a text reply.""" + self._send_command(self.TEXT_CMD, command) + + # Reply can stretch multiple buffers + full_content = b'' + while _TEXT_REPLY_COMPLETED not in full_content: + message_type, content = self._read_response() + + # Check for message failure in all the message types. + if _TEXT_REPLY_FAILURE in content: + raise exceptions.InvalidResponse( + 'Command failure: %s' % content.decode('ascii')) + + if message_type != self.TEXT_REPLY_CMD: + raise exceptions.InvalidResponse( + 'Message type %02x does not match expectations: %s' % + (message_type, content.decode('ascii'))) + + full_content += content + + decoded_message = full_content.decode('ascii') + + match = _TEXT_REPLY_FORMAT.search(decoded_message) + if not match: + raise exceptions.InvalidResponse(decoded_message) + + message = match.group('message') + _verify_checksum(message, match.group('checksum')) + + return message + + # Some of the commands are also shared across devices that use this HID + # protocol, but not many. Only provide here those that do seep to change + # between them. + def get_datetime(self): + """Gets the date and time as reported by the device. + + This is one of the few commands that appear common to many of the + FreeStyle devices that use the HID framing protocol. + """ + date = self._send_text_command(b'$date?').rstrip('\r\n') + time = self._send_text_command(b'$time?').rstrip('\r\n') + + # Year is returned as an offset to 2000. + month, day, year = (int(x) for x in date.split(',')) + hour, minute = (int(x) for x in time.split(',')) + + return datetime.datetime(year + 2000, month, day, hour, minute) + + def _get_multirecord(self, command): + """Queries for, and returns, "multirecords" results. + + Multirecords are used for querying events, readings, history and similar + other data out of a FreeStyle device. These are comma-separated values, + variable-length. + + The validation includes the general HID framing parsing, as well as + validation of the record count, and of the embedded records checksum. + + Args: + command: (bytes) the text command to send to the device for the query. + + Returns: + (csv.reader): a CSV reader object that returns a record for each line + in the record file. + """ + message = self._send_text_command(command) + match = _MULTIRECORDS_FORMAT.search(message) + if not match: + raise exceptions.InvalidResponse(message) + + records_str = match.group('message') + _verify_checksum(records_str, match.group('checksum')) + + return csv.reader(records_str.split('\r\n')) -- cgit v1.2.3