From 4ec3a8e678d4a3889921138fe83970aba39992f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sun, 31 Dec 2017 21:48:49 +0000 Subject: sdcodefree: rewrite using construct and improve readability. This replaces the use of manual structures with well-defined construct entries; it also give consistency to packet vs message. Log input and output messages, to be clearer. --- README | 30 +++---- glucometerutils/drivers/sdcodefree.py | 165 ++++++++++++++++------------------ setup.py | 2 +- 3 files changed, 94 insertions(+), 103 deletions(-) diff --git a/README b/README index 1f42487..a5dcd32 100644 --- a/README +++ b/README @@ -32,21 +32,21 @@ $ . glucometerutils-venv/bin/activate Please see the following table for the driver for each device that is known and supported. -| Manufacturer | Model Name | Driver | Dependencies | -| --- | --- | --- | --- | -| LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] | -| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [pyserial] | -| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [pyserial] | -| LifeScan | OneTouch Verio (USB) | `otverio2015` | [python-scsi] | -| LifeScan | OneTouch Select Plus | `otverio2015` | [python-scsi] | -| Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ | -| Abbott | FreeStyle Libre | `fslibre` | [construct] [hidapi]‡ | -| Abbott | FreeStyle Optium | `fsoptium` | [pyserial] | -| Abbott | FreeStyle Precision Neo | `fsprecisionneo` | [construct] [hidapi]‡ | -| Abbott | FreeStyle Optium Neo | `fsprecisionneo` | [construct] [hidapi]‡ | -| Abbott | FreeStyle Optium Neo H | `fsprecisionneo` | [construct] [hidapi]‡ | -| Roche | Accu-Chek Mobile | `accuchek_reports` | | -| SD Biosensor | SD CodeFree | `sdcodefree` | [pyserial] | +| Manufacturer | Model Name | Driver | Dependencies | +| --- | --- | --- | --- | +| LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] | +| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [pyserial] | +| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [pyserial] | +| LifeScan | OneTouch Verio (USB) | `otverio2015` | [python-scsi] | +| LifeScan | OneTouch Select Plus | `otverio2015` | [python-scsi] | +| Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ | +| Abbott | FreeStyle Libre | `fslibre` | [construct] [hidapi]‡ | +| Abbott | FreeStyle Optium | `fsoptium` | [pyserial] | +| Abbott | FreeStyle Precision Neo | `fsprecisionneo` | [construct] [hidapi]‡ | +| Abbott | FreeStyle Optium Neo | `fsprecisionneo` | [construct] [hidapi]‡ | +| Abbott | FreeStyle Optium Neo H | `fsprecisionneo` | [construct] [hidapi]‡ | +| Roche | Accu-Chek Mobile | `accuchek_reports` | | +| SD Biosensor | SD CodeFree | `sdcodefree` | [construct] [pyserial] | † Untested. ‡ Optional dependency on Linux; required on other operating systems. diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py index 4a375bd..2d145cc 100644 --- a/glucometerutils/drivers/sdcodefree.py +++ b/glucometerutils/drivers/sdcodefree.py @@ -19,96 +19,83 @@ __email__ = 'flameeyes@flameeyes.eu' __copyright__ = 'Copyright © 2017, Diego Elio Pettenò' __license__ = 'MIT' -import array -import collections +import binascii import datetime +import enum import functools import logging import operator -import struct -import time + +import construct from glucometerutils import common from glucometerutils import exceptions from glucometerutils.support import serial -_STX = 0x53 # Not really 'STX' -_ETX = 0xAA # Not really 'ETX' - -_DIR_IN = 0x20 -_DIR_OUT = 0x10 - -_IDX_STX = 0 -_IDX_DIRECTION = 1 -_IDX_LENGTH = 2 -_IDX_CHECKSUM = -2 -_IDX_ETX = -1 +def xor_checksum(msg): + return functools.reduce(operator.xor, msg) -_RECV_PREAMBLE = b'\x53\x20' +class Direction(enum.Enum): + In = 0x20 + Out = 0x10 + +_PACKET = construct.Struct( + 'stx' / construct.Const(construct.Byte, 0x53), + 'direction' / construct.SymmetricMapping( + construct.Byte, + {e: e.value for e in Direction}), + 'length' / construct.Rebuild( + construct.Byte, lambda ctx: len(ctx.message) + 2), + 'message' / construct.Bytes(length=lambda ctx: ctx.length - 2), + 'checksum' / construct.Checksum( + construct.Byte, xor_checksum, construct.this.message), + 'etx' / construct.Const(construct.Byte, 0xAA) +) + +_FIRST_MESSAGE = construct.Struct( + construct.Const(construct.Byte, 0x30), + 'count' / construct.Int16ub, + construct.Const(construct.Byte, 0xAA)[19]) _CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA' -_RESPONSE_PACKET = b'\x10\x40' - -_DATE_SET_PACKET = b'\x10\x10' +_RESPONSE_MESSAGE = b'\x10\x40' -_DISCONNECT_PACKET = b'\x10\x60' -_DISCONNECTED_PACKET = b'\x10\x70' +_DATE_SET_MESSAGE = b'\x10\x10' -_STRUCT_READINGS_COUNT = struct.Struct('>H') +_DISCONNECT_MESSAGE = b'\x10\x60' +_DISCONNECTED_MESSAGE = b'\x10\x70' -_FETCH_PACKET = b'\x10\x60' - -_ReadingRecord = collections.namedtuple( - '_ReadingRecord', - ('unknown1', 'unknown2', 'year', 'month', 'day', 'hour', 'minute', - 'value', 'meal_flag')) -_STRUCT_READING = struct.Struct('>BBBBBBBHB') +_FETCH_MESSAGE = b'\x10\x60' _MEAL_FLAG = { - 0x00: common.Meal.NONE, - 0x10: common.Meal.BEFORE, - 0x20: common.Meal.AFTER, + common.Meal.NONE: 0x00, + common.Meal.BEFORE: 0x10, + common.Meal.AFTER: 0x20, } -def parse_reading(msgdata): - return _ReadingRecord(*_STRUCT_READING.unpack_from(msgdata)) +_READING = construct.Struct( + construct.Byte[2], + 'year' / construct.Byte, + 'month' / construct.Byte, + 'day' / construct.Byte, + 'hour' / construct.Byte, + 'minute' / construct.Byte, + 'value' / construct.Int16ub, + 'meal' / construct.SymmetricMapping( + construct.Byte, _MEAL_FLAG), + construct.Byte[7], +) -def xor_checksum(msg): - return functools.reduce(operator.xor, msg) class Device(serial.SerialDevice): BAUDRATE = 38400 DEFAULT_CABLE_ID = '10c4:ea60' # Generic cable. TIMEOUT = 300 # We need to wait for data from the device. - def read_packet(self): - preamble = self.serial_.read(3) - if len(preamble) != 3: - raise exceptione.InvalidResponse( - response='Expected 3 bytes, received %d' % len(preamble)) - if preamble[0:_IDX_LENGTH] != _RECV_PREAMBLE: - raise exceptions.InvalidResponse( - response='Unexpected preamble %r' % pramble[0:_IDX_LENGTH]) - - msglen = preamble[_IDX_LENGTH] - message = self.serial_.read(msglen) - if len(message) != msglen: - raise exception.InvalidResponse( - response='Expected %d bytes, received %d' % - (msglen, len(message))) - if message[_IDX_ETX] != _ETX: - raise exception.InvalidResponse( - response='Unexpected end-of-transmission byte: %02x' % - message[_IDX_ETX]) - - # Calculate the checksum up until before the checksum itself. - msgdata = message[:_IDX_CHECKSUM] - - cksum = xor_checksum(msgdata) - if cksum != message[_IDX_CHECKSUM]: - raise exception.InvalidChecksum(message[_IDX_CHECKSUM], cksum) - - return msgdata + def read_message(self): + pkt = _PACKET.parse_stream(self.serial_) + logging.debug('received packet: %r', pkt) + return pkt.message def wait_and_ready(self): challenge = self.serial_.read(1) @@ -116,6 +103,7 @@ class Device(serial.SerialDevice): # The first packet read may have a prefixed zero, it might be a bug in # the cp210x driver or device, but discard it if found. if challenge == b'\0': + logging.debug('spurious null byte received') challege = self.serial_.read(1) if challenge != b'\x53': raise exceptions.ConnectionFailed( @@ -127,30 +115,33 @@ class Device(serial.SerialDevice): raise exceptions.ConnectionFailed( message='Unexpected challenge %r' % challenge) - self.send_packet(_RESPONSE_PACKET) + logging.debug( + 'challenge packet received: %s', binascii.hexlify(challenge)) + + self.send_message(_RESPONSE_MESSAGE) # The first packet only contains the counter of how many readings are # available. - first_packet = self.read_packet() - - count = _STRUCT_READINGS_COUNT.unpack_from(first_packet, 1) + first_message = _FIRST_MESSAGE.parse(self.read_message()) + logging.debug('received first message: %r', first_message) - return count[0] + return first_message.count - def send_packet(self, msgdata): - packet = array.array('B') - packet.extend((_STX, _DIR_OUT, len(msgdata)+2)) - packet.extend(msgdata) - packet.extend((xor_checksum(msgdata), _ETX)) - self.serial_.write(packet.tobytes()) + def send_message(self, message): + pkt = _PACKET.build({ + 'message': message, + 'direction': Direction.Out + }) + logging.debug('sending packet: %s', binascii.hexlify(pkt)) + self.serial_.write(pkt) def connect(self): print("Please connect and turn on the device.") def disconnect(self): - self.send_packet(_DISCONNECT_PACKET) - response = self.read_packet() - if response != _DISCONNECTED_PACKET: + self.send_message(_DISCONNECT_MESSAGE) + response = self.read_message() + if response != _DISCONNECTED_MESSAGE: raise exceptions.InvalidResponse(response=response) def get_meter_info(self): @@ -175,9 +166,9 @@ class Device(serial.SerialDevice): # Ignore the readings count. self.wait_and_ready() - self.send_packet(setdatecmd) - response = self.read_packet() - if response != _DATE_SET_PACKET: + self.send_message(setdatecmd) + response = self.read_message() + if response != _DATE_SET_MESSAGE: raise exceptions.InvalidResponse(response=response) # The date we return should only include up to minute, unfortunately. @@ -185,19 +176,19 @@ class Device(serial.SerialDevice): date.hour, date.minute) def zero_log(self): - raise NotmplementedError + raise NotImplementedError def get_readings(self): count = self.wait_and_ready() for _ in range(count): - self.send_packet(_FETCH_PACKET) - rpkt = self.read_packet() + self.send_message(_FETCH_MESSAGE) + message = self.read_message() - r = parse_reading(rpkt) - meal = _MEAL_FLAG[r.meal_flag] + r = _READING.parse(message) + logging.debug('received reading: %r', r) yield common.GlucoseReading( datetime.datetime( 2000 + r.year, r.month, r.day, r.hour, r.minute), - r.value, meal=meal) + r.value, meal=r.meal) diff --git a/setup.py b/setup.py index c49745a..a9b7c18 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,7 @@ setup( 'fsoptium': ['pyserial'], 'fsprecisionneo': ['construct', 'hidapi'], 'accucheck_reports': [], - 'sdcodefree': ['pyserial'], + 'sdcodefree': ['construct', 'pyserial'], }, entry_points = { 'console_scripts': [ -- cgit v1.2.3