From 81f94cb34cb0482339fe4f0934e2d0ecfa0a21e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Thu, 9 Feb 2017 22:00:30 +0000 Subject: sdcodefree: reformat and cleanup. --- glucometerutils/drivers/sdcodefree.py | 273 +++++++++++++++++----------------- 1 file changed, 138 insertions(+), 135 deletions(-) diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py index e7ce753..c89d894 100644 --- a/glucometerutils/drivers/sdcodefree.py +++ b/glucometerutils/drivers/sdcodefree.py @@ -3,7 +3,7 @@ __author__ = 'Diego Elio Pettenò' __email__ = 'flameeyes@flameeyes.eu' -__copyright__ = 'Copyright © 2016, Diego Elio Pettenò' +__copyright__ = 'Copyright © 2017, Diego Elio Pettenò' __license__ = 'MIT' import array @@ -19,8 +19,8 @@ import serial from glucometerutils import common from glucometerutils import exceptions -_STX = 0x53 # Not really 'STX' -_ETX = 0xAA # Not really 'ETX' +_STX = 0x53 # Not really 'STX' +_ETX = 0xAA # Not really 'ETX' _DIR_IN = 0x20 _DIR_OUT = 0x10 @@ -46,145 +46,148 @@ _STRUCT_READINGS_COUNT = struct.Struct('>H') _FETCH_PACKET = b'\x10\x60' _ReadingRecord = collections.namedtuple( - '_ReadingRecord', - ('unknown1', 'unknown2', 'year', 'month', 'day', 'hour', 'minute', - 'value', 'meal_flag')) + '_ReadingRecord', + ('unknown1', 'unknown2', 'year', 'month', 'day', 'hour', 'minute', + 'value', 'meal_flag')) _STRUCT_READING = struct.Struct('>BBBBBBBHB') _MEAL_FLAG = { - 0x00: common.NO_MEAL, - 0x10: common.BEFORE_MEAL, - 0x20: common.AFTER_MEAL + 0x00: common.NO_MEAL, + 0x10: common.BEFORE_MEAL, + 0x20: common.AFTER_MEAL } def parse_reading(msgdata): - return _ReadingRecord(*_STRUCT_READING.unpack_from(msgdata)) + return _ReadingRecord(*_STRUCT_READING.unpack_from(msgdata)) def xor_checksum(msg): - return functools.reduce(operator.xor, msg) + return functools.reduce(operator.xor, msg) class Device(object): - def __init__(self, device): - self.serial_ = serial.Serial( - port=device, baudrate=38400, bytesize=serial.EIGHTBITS, - parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, - timeout=300, xonxoff=False, rtscts=False, dsrdtr=False, - writeTimeout=None) - - def read_packet(self): - preamble = self.serial_.read(3) - if len(preamble) != 3: - raise exceptione.InvalidResponse( - response='Expected 3 bytes, received %d' % len(preamble)) - if preamble[0:_IDX_LENGTH] != _RECV_PREAMBLE: - raise exceptions.InvalidResponse( - response='Unexpected preamble %r' % pramble[0:_IDX_LENGTH]) - - msglen = preamble[_IDX_LENGTH] - message = self.serial_.read(msglen) - if len(message) != msglen: - raise exception.InvalidResponse( - response='Expected %d bytes, received %d' % (msglen, len(message))) - if message[_IDX_ETX] != _ETX: - raise exception.InvalidResponse( - response='Unexpected end-of-transmission byte: %02x' % message[_IDX_ETX]) - - # Calculate the checksum up until before the checksum itself. - msgdata = message[:_IDX_CHECKSUM] - - cksum = xor_checksum(msgdata) - if cksum != message[_IDX_CHECKSUM]: - raise exception.InvalidChecksum(message[_IDX_CHECKSUM], cksum) - - return msgdata - - def wait_and_ready(self): - challenge = self.serial_.read(1) - - # The first packet read may have a prefixed zero, it might be a bug in the - # cp210x driver or device, but discard it if found. - if challenge == b'\0': - challege = self.serial_.read(1) - if challenge != b'\x53': - raise exceptions.ConnectionFailed( - message='Unexpected starting bytes %r' % challenge) - - challenge += self.serial_.read(6) - - if challenge != _CHALLENGE_PACKET_FULL: - raise exceptions.ConnectionFailed( - message='Unexpected challenge %r' % challenge) - - self.send_packet(_RESPONSE_PACKET) - - # The first packet only contains the counter of how many readings are - # available. - first_packet = self.read_packet() - - count = _STRUCT_READINGS_COUNT.unpack_from(first_packet, 1) - - return count[0] - - def send_packet(self, msgdata): - packet = array.array('B') - packet.extend((_STX, _DIR_OUT, len(msgdata)+2)) - packet.extend(msgdata) - packet.extend((xor_checksum(msgdata), _ETX)) - self.serial_.write(packet.tobytes()) - - def connect(self): - print("Please connect and turn on the device.") - - def disconnect(self): - self.send_packet(_DISCONNECT_PACKET) - response = self.read_packet() - if response != _DISCONNECTED_PACKET: - raise exceptions.InvalidResponse(response=response) - - def get_meter_info(self): - return common.MeterInfo('SD CodeFree glucometer') - - def get_version(self): - raise NotImplementedError - - def get_serial_number(self): - raise NotImplementedError - - def get_glucose_unit(self): - # Device does not provide information on glucose unit. - return common.UNIT_MGDL - - def get_datetime(self): - raise NotImplementedError - - def set_datetime(self, date=datetime.datetime.now()): - setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii') - - # Ignore the readings count. - self.wait_and_ready() - - self.send_packet(setdatecmd) - response = self.read_packet() - if response != _DATE_SET_PACKET: - raise exceptions.InvalidResponse(response=response) - - # The date we return should only include up to minute, unfortunately. - return datetime.datetime(date.year, date.month, date.day, - date.hour, date.minute) - - def zero_log(self): - raise NotmplementedError - - def get_readings(self): - count = self.wait_and_ready() - - for _ in range(count): - self.send_packet(_FETCH_PACKET) - rpkt = self.read_packet() - - r = parse_reading(rpkt) - meal = _MEAL_FLAG[r.meal_flag] - - yield common.Reading( - datetime.datetime(2000 + r.year, r.month, r.day, r.hour, r.minute), - r.value, meal=meal) + def __init__(self, device): + self.serial_ = serial.Serial( + port=device, baudrate=38400, bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, + timeout=300, xonxoff=False, rtscts=False, dsrdtr=False, + writeTimeout=None) + + def read_packet(self): + preamble = self.serial_.read(3) + if len(preamble) != 3: + raise exceptione.InvalidResponse( + response='Expected 3 bytes, received %d' % len(preamble)) + if preamble[0:_IDX_LENGTH] != _RECV_PREAMBLE: + raise exceptions.InvalidResponse( + response='Unexpected preamble %r' % pramble[0:_IDX_LENGTH]) + + msglen = preamble[_IDX_LENGTH] + message = self.serial_.read(msglen) + if len(message) != msglen: + raise exception.InvalidResponse( + response='Expected %d bytes, received %d' % + (msglen, len(message))) + if message[_IDX_ETX] != _ETX: + raise exception.InvalidResponse( + response='Unexpected end-of-transmission byte: %02x' % + message[_IDX_ETX]) + + # Calculate the checksum up until before the checksum itself. + msgdata = message[:_IDX_CHECKSUM] + + cksum = xor_checksum(msgdata) + if cksum != message[_IDX_CHECKSUM]: + raise exception.InvalidChecksum(message[_IDX_CHECKSUM], cksum) + + return msgdata + + def wait_and_ready(self): + challenge = self.serial_.read(1) + + # The first packet read may have a prefixed zero, it might be a bug in + # the cp210x driver or device, but discard it if found. + if challenge == b'\0': + challege = self.serial_.read(1) + if challenge != b'\x53': + raise exceptions.ConnectionFailed( + message='Unexpected starting bytes %r' % challenge) + + challenge += self.serial_.read(6) + + if challenge != _CHALLENGE_PACKET_FULL: + raise exceptions.ConnectionFailed( + message='Unexpected challenge %r' % challenge) + + self.send_packet(_RESPONSE_PACKET) + + # The first packet only contains the counter of how many readings are + # available. + first_packet = self.read_packet() + + count = _STRUCT_READINGS_COUNT.unpack_from(first_packet, 1) + + return count[0] + + def send_packet(self, msgdata): + packet = array.array('B') + packet.extend((_STX, _DIR_OUT, len(msgdata)+2)) + packet.extend(msgdata) + packet.extend((xor_checksum(msgdata), _ETX)) + self.serial_.write(packet.tobytes()) + + def connect(self): + print("Please connect and turn on the device.") + + def disconnect(self): + self.send_packet(_DISCONNECT_PACKET) + response = self.read_packet() + if response != _DISCONNECTED_PACKET: + raise exceptions.InvalidResponse(response=response) + + def get_meter_info(self): + return common.MeterInfo('SD CodeFree glucometer') + + def get_version(self): + raise NotImplementedError + + def get_serial_number(self): + raise NotImplementedError + + def get_glucose_unit(self): + # Device does not provide information on glucose unit. + return common.UNIT_MGDL + + def get_datetime(self): + raise NotImplementedError + + def set_datetime(self, date=datetime.datetime.now()): + setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii') + + # Ignore the readings count. + self.wait_and_ready() + + self.send_packet(setdatecmd) + response = self.read_packet() + if response != _DATE_SET_PACKET: + raise exceptions.InvalidResponse(response=response) + + # The date we return should only include up to minute, unfortunately. + return datetime.datetime(date.year, date.month, date.day, + date.hour, date.minute) + + def zero_log(self): + raise NotmplementedError + + def get_readings(self): + count = self.wait_and_ready() + + for _ in range(count): + self.send_packet(_FETCH_PACKET) + rpkt = self.read_packet() + + r = parse_reading(rpkt) + meal = _MEAL_FLAG[r.meal_flag] + + yield common.Reading( + datetime.datetime( + 2000 + r.year, r.month, r.day, r.hour, r.minute), + r.value, meal=meal) -- cgit v1.2.3