From 4ba7f6de9c9853be6767b8b30936ac0c7ee2f4b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sun, 19 Jun 2016 17:02:31 +0100 Subject: sdcodefree: new driver. This driver supports SD Codefree readers, that are sold in the UK by HomeHealth UK. --- README | 3 + glucometerutils/drivers/sdcodefree.py | 190 ++++++++++++++++++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 glucometerutils/drivers/sdcodefree.py diff --git a/README b/README index 161c94c..e1fba69 100644 --- a/README +++ b/README @@ -30,6 +30,7 @@ information on each of the devices. | LifeScan | OneTouch Select Plus | `otverio2015` | | Abbott | FreeStyle Optium | `freestyle_optium` | | Roche | Accu-Chek Mobile | `accuchek_reports` | +| SD Biosensor | SD CodeFree | `sdcodefree` | ### Driver features @@ -40,6 +41,7 @@ information on each of the devices. | `otverio2015` | serialno, swver | get and set | no | yes | | `freestyle_optium` | serialno, swver, unit | get and set | not supported by device | not supported by device | | `accuchek_reports` | serialno, unit | no | yes | not supported by device | +| `sdcodefree` | none | get | pre-/post-meal notes | not supported by device | ### Driver dependencies @@ -49,6 +51,7 @@ information on each of the devices. | `otultraeasy` | [pyserial] | | `otverio2015` | [python-scsi] | | `freestyle_optium` | [pyserial] | +| `sdcodefree` | [pyserial] | [pyserial]: https://pythonhosted.org/pyserial/ [python-scsi]: https://github.com/rosjat/python-scsi diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py new file mode 100644 index 0000000..ec232dd --- /dev/null +++ b/glucometerutils/drivers/sdcodefree.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- +"""Driver for SD CodeFree devices by SD Biosensor""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2016, Diego Elio Pettenò' +__license__ = 'MIT' + +import array +import collections +import datetime +import functools +import operator +import struct +import time + +import serial + +from glucometerutils import common +from glucometerutils import exceptions + +_STX = 0x53 # Not really 'STX' +_ETX = 0xAA # Not really 'ETX' + +_DIR_IN = 0x20 +_DIR_OUT = 0x10 + +_IDX_STX = 0 +_IDX_DIRECTION = 1 +_IDX_LENGTH = 2 +_IDX_CHECKSUM = -2 +_IDX_ETX = -1 + +_RECV_PREAMBLE = b'\x53\x20' + +_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA' +_RESPONSE_PACKET = b'\x10\x40' + +_DATE_SET_PACKET = b'\x10\x10' + +_DISCONNECT_PACKET = b'\x10\x60' +_DISCONNECTED_PACKET = b'\x10\x70' + +_STRUCT_READINGS_COUNT = struct.Struct('>H') + +_FETCH_PACKET = b'\x10\x60' + +_ReadingRecord = collections.namedtuple( + '_ReadingRecord', + ('unknown1', 'unknown2', 'year', 'month', 'day', 'hour', 'minute', + 'value', 'meal_flag')) +_STRUCT_READING = struct.Struct('>BBBBBBBHB') + +_MEAL_FLAG = { + 0x00: common.NO_MEAL, + 0x10: common.BEFORE_MEAL, + 0x20: common.AFTER_MEAL +} + +def parse_reading(msgdata): + return _ReadingRecord(*_STRUCT_READING.unpack_from(msgdata)) + +def xor_checksum(msg): + return functools.reduce(operator.xor, msg) + +class Device(object): + def __init__(self, device): + self.serial_ = serial.Serial( + port=device, baudrate=38400, bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, + timeout=300, xonxoff=False, rtscts=False, dsrdtr=False, + writeTimeout=None) + + def read_packet(self): + preamble = self.serial_.read(3) + if len(preamble) != 3: + raise exceptione.InvalidResponse( + response='Expected 3 bytes, received %d' % len(preamble)) + if preamble[0:_IDX_LENGTH] != _RECV_PREAMBLE: + raise exceptions.InvalidResponse( + response='Unexpected preamble %r' % pramble[0:_IDX_LENGTH]) + + msglen = preamble[_IDX_LENGTH] + message = self.serial_.read(msglen) + if len(message) != msglen: + raise exception.InvalidResponse( + response='Expected %d bytes, received %d' % (msglen, len(message))) + if message[_IDX_ETX] != _ETX: + raise exception.InvalidResponse( + response='Unexpected end-of-transmission byte: %02x' % message[_IDX_ETX]) + + # Calculate the checksum up until before the checksum itself. + msgdata = message[:_IDX_CHECKSUM] + + cksum = xor_checksum(msgdata) + if cksum != message[_IDX_CHECKSUM]: + raise exception.InvalidChecksum(message[_IDX_CHECKSUM], cksum) + + return msgdata + + def wait_and_ready(self): + challenge = self.serial_.read(1) + + # The first packet read may have a prefixed zero, it might be a bug in the + # cp210x driver or device, but discard it if found. + if challenge == b'\0': + challege = self.serial_.read(1) + if challenge != b'\x53': + raise exceptions.ConnectionFailed( + message='Unexpected starting bytes %r' % challenge) + + challenge += self.serial_.read(6) + + if challenge != _CHALLENGE_PACKET_FULL: + raise exceptions.ConnectionFailed( + message='Unexpected challenge %r' % challenge) + + self.send_packet(_RESPONSE_PACKET) + + # The first packet only contains the counter of how many readings are + # available. + first_packet = self.read_packet() + + count = _STRUCT_READINGS_COUNT.unpack_from(first_packet, 1) + + return count[0] + + def send_packet(self, msgdata): + packet = array.array('B') + packet.extend((_STX, _DIR_OUT, len(msgdata)+2)) + packet.extend(msgdata) + packet.extend((xor_checksum(msgdata), _ETX)) + self.serial_.write(packet.tobytes()) + + def connect(self): + print("Please connect and turn on the device.") + + def disconnect(self): + self.send_packet(_DISCONNECT_PACKET) + response = self.read_packet() + if response != _DISCONNECTED_PACKET: + raise exceptions.InvalidResponse(response=response) + + def get_information_string(self): + return ('SD CodeFree glucometer\n') + + def get_version(self): + raise NotImplementedError + + def get_serial_number(self): + raise NotImplementedError + + def get_glucose_unit(self): + # Device does not provide information on glucose unit. + return common.UNIT_MGDL + + def get_datetime(self): + raise NotImplementedError + + def set_datetime(self, date=datetime.datetime.now()): + setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii') + + # Ignore the readings count. + self.wait_and_ready() + + self.send_packet(setdatecmd) + response = self.read_packet() + if response != _DATE_SET_PACKET: + raise exceptions.InvalidResponse(response=response) + + # The date we return should only include up to minute, unfortunately. + return datetime.datetime(date.year, date.month, date.day, + date.hour, date.minute) + + def zero_log(self): + raise NotmplementedError + + def get_readings(self): + count = self.wait_and_ready() + + for _ in range(count): + self.send_packet(_FETCH_PACKET) + rpkt = self.read_packet() + + r = parse_reading(rpkt) + meal = _MEAL_FLAG[r.meal_flag] + + yield common.Reading( + datetime.datetime(2000 + r.year, r.month, r.day, r.hour, r.minute), + r.value, meal=meal) -- cgit v1.2.3