diff options
-rw-r--r-- | README | 5 | ||||
-rw-r--r-- | glucometerutils/drivers/otverio2015.py | 226 |
2 files changed, 231 insertions, 0 deletions
@@ -23,9 +23,14 @@ Supported devices * **LifeScan OneTouch Ultra Easy** (also known as **Ultra Mini**): get information, get and set time, dump of readings in native units, memory reset. + * **LifeScan OneTouch Verio** (USB version), **LifeScan OneTouch + Select Plus**: get information, get and set time, dump of + readings. Requires [python-scsi][1]. * **Abbott FreeStyle Optium**: get information, get and set time, dump of readings in native units. +[1] https://github.com/rosjat/python-scsi + Dump format ----------- diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py new file mode 100644 index 0000000..44f3573 --- /dev/null +++ b/glucometerutils/drivers/otverio2015.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +"""Driver for LifeScan OneTouch Verio 2015 devices. + +Further information on the device protocol can be found at + +https://github.com/Flameeyes/glucometer-protocols/blob/master/lifescan/onetouch-verio-2015.md +""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2016, Diego Elio Pettenò' +__license__ = 'MIT' + +import datetime +import struct + +from pyscsi.pyscsi.scsi import SCSI +from pyscsi.pyscsi.scsi_device import SCSIDevice + +from glucometerutils import common +from glucometerutils import exceptions +from glucometerutils.drivers import lifescan_common + +# Match the same values in the otultraeasy driver. +_STX = 0x02 +_ETX = 0x03 + +# This device uses SCSI blocks as registers. +_REGISTER_SIZE = 512 + +_STRUCT_PREAMBLE = struct.Struct('<BH') +_STRUCT_CODA = _STRUCT_PREAMBLE # they are actually the same, mirrored. + +_STRUCT_UINT16 = struct.Struct('<H') +_STRUCT_UINT32 = struct.Struct('<I') + +_STRUCT_CHECKSUM = _STRUCT_UINT16 +_STRUCT_TIMESTAMP = _STRUCT_UINT32 +_STRUCT_RECORDID = _STRUCT_UINT16 +_STRUCT_READING = _STRUCT_UINT32 +_STRUCT_RECORD = struct.Struct('<BBHBHIIBBB') + +_QUERY_REQUEST = b'\x04\xe6\x02' +_QUERY_KEY_SERIAL = b'\x00' +_QUERY_KEY_MODEL = b'\x01' +_QUERY_KEY_SOFTWARE = b'\x02' + +_READ_RTC_REQUEST = b'\x04\x20\x02' +_WRITE_RTC_REQUEST = b'\x04\x20\x01' +# All timestamp reported by this device are seconds since this date. +_EPOCH_BASE = 946684800 # 2010-01-01 00:00 + +_READ_RECORD_COUNT_REQUEST = b'\x04\x27\x00' +_READ_RECORD_REQUEST_PREFIX = b'\x04\x31\x02' +_READ_RECORD_REQUEST_SUFFIX = b'\x00' + +_MEMORY_ERASE_REQUEST = b'\x04\x1a' + +def _extract_message(register): + """Parse the message preamble and verify checksums.""" + stx, length = _STRUCT_PREAMBLE.unpack_from(register) + if stx != _STX: + raise lifescan_common.MalformedCommand( + 'invalid STX byte: %02x' % stx) + if length > _REGISTER_SIZE: + raise lifescan_common.MalformedCommand( + 'invalid length: %d > REGISTER_SIZE' % length) + + # 2 is the length of the checksum, so it should be ignored. + calculated_checksum = lifescan_common.crc_ccitt(register[:(length-2)]) + + coda_offset = length - _STRUCT_CODA.size + etx, encoded_checksum = _STRUCT_CODA.unpack_from(register[coda_offset:]) + if etx != _ETX: + raise lifescan_common.MalformedCommand( + 'invalid ETX byte: %02x' % etx) + if encoded_checksum != calculated_checksum: + raise lifescan_common.InvalidChecksum( + encoded_checksum, calculated_checksum) + + response = register[_STRUCT_PREAMBLE.size:coda_offset] + return response + +def _encode_message(cmd): + """Add message preamble and calculate checksum, add padding.""" + length = len(cmd) + _STRUCT_PREAMBLE.size + _STRUCT_CODA.size + preamble = _STRUCT_PREAMBLE.pack(_STX, length) + message = preamble + cmd + bytes((_ETX,)) + checksum = _STRUCT_CHECKSUM.pack(lifescan_common.crc_ccitt(message)) + + # Pad the message to match the size of the register. + return message + checksum + bytes( + _REGISTER_SIZE - 2 - len(message)) + +def _convert_timestamp(timestamp): + return datetime.datetime.fromtimestamp(timestamp + _EPOCH_BASE) + +class Device(object): + def __init__(self, device): + self.device_name_ = device + self.scsi_device_ = SCSIDevice(device) + self.scsi_ = SCSI(self.scsi_device_) + self.scsi_.blocksize = _REGISTER_SIZE + + def _send_message(self, cmd, lba): + """Send a request to the meter, and read its response. + + Args: + cmd: (bytes) the raw command to send the device, without + preamble or checksum. + lba: (int) the address of the block register to use, known + valid addresses are 3, 4 and 5. + + Returns: + (bytes) The raw response from the meter. No preamble or coda is + present, and the checksum has already been validated. + """ + self.scsi_.write10(lba, 1, _encode_message(cmd)) + response = self.scsi_.read10(lba, 1) + # TODO: validate that the response is valid. + return _extract_message(response.datain) + + def connect(self): + inq = self.scsi_.inquiry() + vendor = inq.result['t10_vendor_identification'][:32] + if vendor != b'LifeScan': + raise exceptions.ConnectionFailed( + 'Device %s is not a LifeScan glucometer.' % self.device_name_) + + def disconnect(self): + return + + def get_information_string(self): + return ('OneTouch %s glucometer\n' + 'Serial number: %s\n' + 'Software version: %s\n' + 'Time: %s\n' + 'Default unit: unknown\n' % ( + self._query_string(_QUERY_KEY_MODEL), + self.get_serial_number(), + self.get_version(), + self.get_datetime())) + + def _query_string(self, query_key): + response = self._send_message(_QUERY_REQUEST + query_key, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + # Strings are encoded in wide characters (LE), but they should + # only contain ASCII characters. Note that the string is + # null-terminated, so the last character should be dropped. + return response[2:].decode('utf-16-le')[:-1] + + def get_serial_number(self): + return self._query_string(_QUERY_KEY_SERIAL) + + def get_version(self): + return self._query_string(_QUERY_KEY_SOFTWARE) + + def get_datetime(self): + response = self._send_message(_READ_RTC_REQUEST, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + (timestamp,) = _STRUCT_TIMESTAMP.unpack(response[2:]) + return _convert_timestamp(timestamp) + + def set_datetime(self, date=datetime.datetime.now()): + epoch = datetime.datetime.utcfromtimestamp(_EPOCH_BASE) + delta = date - epoch + timestamp = int(delta.total_seconds()) + + timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp) + response = self._send_message(_WRITE_RTC_REQUEST + timestamp_bytes, 3) + + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + + # The device does not return the new datetime, so confirm by + # calling READ RTC again. + return self.get_datetime() + + def zero_log(self): + response = self._send_message(_MEMORY_ERASE_REQUEST, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + + def _get_reading_count(self): + response = self._send_message(_READ_RECORD_COUNT_REQUEST, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + + (record_count,) = _STRUCT_RECORDID.unpack(response[2:]) + return record_count + + def get_glucose_unit(self): + return common.UNIT_MGDL + + def _get_reading(self, record_number): + request = (_READ_RECORD_REQUEST_PREFIX + + _STRUCT_RECORDID.pack(record_number) + + _READ_RECORD_REQUEST_SUFFIX) + response = self._send_message(request, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + + (unused_const1, unused_const2, unused_counter, unused_const3, + unused_counter2, timestamp, value, unused_flags, unused_const4, + unused_const5) = _STRUCT_RECORD.unpack(response) + + return common.Reading(_convert_timestamp(timestamp), float(value)) + + def get_readings(self): + record_count = self._get_reading_count() + for record_number in range(record_count): + yield self._get_reading(record_number) |