From 7f6a3eeca6aff196e68215d99bb6d48d57d069c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Mon, 1 Jan 2018 12:46:48 +0000 Subject: otverio2015: rewrite using construct. This simplifies the code a bit here and there, making sure that the structures are all define at the top of the file. It also align the structure of the driver a bit more with otultraeasy, making it easier to spot the similitudes. --- README | 30 +-- glucometerutils/drivers/otverio2015.py | 436 ++++++++++++++++----------------- setup.py | 2 +- 3 files changed, 232 insertions(+), 236 deletions(-) diff --git a/README b/README index 947348c..2e7a460 100644 --- a/README +++ b/README @@ -32,21 +32,21 @@ $ . glucometerutils-venv/bin/activate Please see the following table for the driver for each device that is known and supported. -| Manufacturer | Model Name | Driver | Dependencies | -| --- | --- | --- | --- | -| LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] | -| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [construct] [pyserial] | -| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [construct] [pyserial] | -| LifeScan | OneTouch Verio (USB) | `otverio2015` | [python-scsi] | -| LifeScan | OneTouch Select Plus | `otverio2015` | [python-scsi] | -| Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ | -| Abbott | FreeStyle Libre | `fslibre` | [construct] [hidapi]‡ | -| Abbott | FreeStyle Optium | `fsoptium` | [pyserial] | -| Abbott | FreeStyle Precision Neo | `fsprecisionneo` | [construct] [hidapi]‡ | -| Abbott | FreeStyle Optium Neo | `fsprecisionneo` | [construct] [hidapi]‡ | -| Abbott | FreeStyle Optium Neo H | `fsprecisionneo` | [construct] [hidapi]‡ | -| Roche | Accu-Chek Mobile | `accuchek_reports` | | -| SD Biosensor | SD CodeFree | `sdcodefree` | [construct] [pyserial] | +| Manufacturer | Model Name | Driver | Dependencies | +| --- | --- | --- | --- | +| LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] | +| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [construct] [pyserial] | +| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [construct] [pyserial] | +| LifeScan | OneTouch Verio (USB) | `otverio2015` | [construct] [python-scsi] | +| LifeScan | OneTouch Select Plus | `otverio2015` | [construct] [python-scsi] | +| Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ | +| Abbott | FreeStyle Libre | `fslibre` | [construct] [hidapi]‡ | +| Abbott | FreeStyle Optium | `fsoptium` | [pyserial] | +| Abbott | FreeStyle Precision Neo | `fsprecisionneo` | [construct] [hidapi]‡ | +| Abbott | FreeStyle Optium Neo | `fsprecisionneo` | [construct] [hidapi]‡ | +| Abbott | FreeStyle Optium Neo H | `fsprecisionneo` | [construct] [hidapi]‡ | +| Roche | Accu-Chek Mobile | `accuchek_reports` | | +| SD Biosensor | SD CodeFree | `sdcodefree` | [construct] [pyserial] | † Untested. ‡ Optional dependency on Linux; required on other operating systems. diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py index 5925a58..8d87409 100644 --- a/glucometerutils/drivers/otverio2015.py +++ b/glucometerutils/drivers/otverio2015.py @@ -28,246 +28,242 @@ __license__ = 'MIT' import binascii import datetime import logging -import struct +import construct from pyscsi.pyscsi.scsi import SCSI from pyscsi.pyscsi.scsi_device import SCSIDevice from glucometerutils import common from glucometerutils import exceptions +from glucometerutils.support import construct_extras from glucometerutils.support import lifescan -# Match the same values in the otultraeasy driver. -_STX = 0x02 -_ETX = 0x03 - # This device uses SCSI blocks as registers. _REGISTER_SIZE = 512 -_STRUCT_PREAMBLE = struct.Struct(' _REGISTER_SIZE: - raise lifescan.MalformedCommand( - 'invalid length: %d > REGISTER_SIZE' % length) +_READ_UNIT_RESPONSE = construct.Struct( + construct.Const(b'\x03\x06'), # different from _COMMAND_SUCCESS + 'unit' / construct.SymmetricMapping( + construct.Byte, _GLUCOSE_MAPPING), + construct.Padding(3), +) - # 2 is the length of the checksum, so it should be ignored. - calculated_checksum = lifescan.crc_ccitt(register[:(length-2)]) +_READ_RTC_REQUEST = construct.Const(b'\x04\x20\x02') - coda_offset = length - _STRUCT_CODA.size - etx, encoded_checksum = _STRUCT_CODA.unpack_from(register[coda_offset:]) - if etx != _ETX: - raise lifescan.MalformedCommand( - 'invalid ETX byte: %02x' % etx) - if encoded_checksum != calculated_checksum: - raise exceptions.InvalidChecksum(encoded_checksum, calculated_checksum) +_READ_RTC_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'timestamp' / _TIMESTAMP, +) - response = register[_STRUCT_PREAMBLE.size:coda_offset] +_WRITE_RTC_REQUEST = construct.Struct( + construct.Const(b'\x04\x20\x01'), + 'timestamp' / _TIMESTAMP, +) - logging.debug('Read packet: %s' % binascii.hexlify(response)) - return response +_MEMORY_ERASE_REQUEST = construct.Const(b'\x04\x1a') -def _encode_message(cmd): - """Add message preamble and calculate checksum, add padding.""" - length = len(cmd) + _STRUCT_PREAMBLE.size + _STRUCT_CODA.size - preamble = _STRUCT_PREAMBLE.pack(_STX, length) - message = preamble + cmd + bytes((_ETX,)) - checksum = _STRUCT_CHECKSUM.pack(lifescan.crc_ccitt(message)) - message += checksum +_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x04\x27\x00') - logging.debug('Sending packet: %s' % binascii.hexlify(message)) +_READ_RECORD_COUNT_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'count' / construct.Int16ul, +) - # Pad the message to match the size of the register. - return message + bytes(_REGISTER_SIZE - len(message)) +_READ_RECORD_REQUEST = construct.Struct( + construct.Const(b'\x04\x31\x02'), + 'record_id' / construct.Int16ul, + construct.Const(b'\x00'), +) + +_MEAL_FLAG = { + common.Meal.NONE: 0x00, + common.Meal.BEFORE: 0x01, + common.Meal.AFTER: 0x02, +} -def _convert_timestamp(timestamp): - return datetime.datetime.utcfromtimestamp(timestamp + _EPOCH_BASE) +_READ_RECORD_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'inverse_counter' / construct.Int16ul, + construct.Padding(1), + 'lifetime_counter' / construct.Int16ul, + 'timestamp' / _TIMESTAMP, + 'value' / construct.Int16ul, + 'meal' / construct.SymmetricMapping( + construct.Byte, _MEAL_FLAG), + construct.Padding(4), +) class Device(object): - def __init__(self, device): - if not device: - raise exceptions.CommandLineError( - '--device parameter is required, should point to the disk device ' - 'representing the meter.') - - self.device_name_ = device - self.scsi_device_ = SCSIDevice(device, readwrite=True) - self.scsi_ = SCSI(self.scsi_device_) - self.scsi_.blocksize = _REGISTER_SIZE - - def _send_message(self, cmd, lba): - """Send a request to the meter, and read its response. - - Args: - cmd: (bytes) the raw command to send the device, without - preamble or checksum. - lba: (int) the address of the block register to use, known - valid addresses are 3, 4 and 5. - - Returns: - (bytes) The raw response from the meter. No preamble or coda is - present, and the checksum has already been validated. - """ - self.scsi_.write10(lba, 1, _encode_message(cmd)) - response = self.scsi_.read10(lba, 1) - # TODO: validate that the response is valid. - return _extract_message(response.datain) - - def connect(self): - inq = self.scsi_.inquiry() - vendor = inq.result['t10_vendor_identification'][:32] - if vendor != b'LifeScan': - raise exceptions.ConnectionFailed( - 'Device %s is not a LifeScan glucometer.' % self.device_name_) - - def disconnect(self): - return - - def get_meter_info(self): - return common.MeterInfo( - 'OneTouch %s glucometer' % self._query_string(_QUERY_KEY_MODEL), - serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) - - def _query_string(self, query_key): - response = self._send_message(_QUERY_REQUEST + query_key, 3) - if response[0:2] != b'\x04\06': - raise lifescan.MalformedCommand( - 'invalid response, expected 04 06, received %02x %02x' % ( - response[0], response[1])) - # Strings are encoded in wide characters (LE), but they should - # only contain ASCII characters. Note that the string is - # null-terminated, so the last character should be dropped. - return response[2:].decode('utf-16-le')[:-1] - - def _read_parameter(self, parameter_key): - response = self._send_message( - _READ_PARAMETER_REQUEST + parameter_key, 4) - if response[0:2] != b'\x03\x06': - raise lifescan.MalformedCommand( - 'invalid response, expected 03 06, received %02x %02x' % ( - response[0], response[1])) - return response[2:] - - def get_serial_number(self): - return self._query_string(_QUERY_KEY_SERIAL) - - def get_version(self): - return self._query_string(_QUERY_KEY_SOFTWARE) - - def get_datetime(self): - response = self._send_message(_READ_RTC_REQUEST, 3) - if response[0:2] != b'\x04\06': - raise lifescan.MalformedCommand( - 'invalid response, expected 04 06, received %02x %02x' % ( - response[0], response[1])) - (timestamp,) = _STRUCT_TIMESTAMP.unpack(response[2:]) - return _convert_timestamp(timestamp) - - def set_datetime(self, date=datetime.datetime.now()): - epoch = datetime.datetime.utcfromtimestamp(_EPOCH_BASE) - delta = date - epoch - timestamp = int(delta.total_seconds()) - - timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp) - response = self._send_message(_WRITE_RTC_REQUEST + timestamp_bytes, 3) - - if response[0:2] != b'\x04\06': - raise lifescan.MalformedCommand( - 'invalid response, expected 04 06, received %02x %02x' % ( - response[0], response[1])) - - # The device does not return the new datetime, so confirm by - # calling READ RTC again. - return self.get_datetime() - - def zero_log(self): - response = self._send_message(_MEMORY_ERASE_REQUEST, 3) - if response[0:2] != b'\x04\06': - raise lifescan.MalformedCommand( - 'invalid response, expected 04 06, received %02x %02x' % ( - response[0], response[1])) - - def _get_reading_count(self): - response = self._send_message(_READ_RECORD_COUNT_REQUEST, 3) - if response[0:2] != b'\x04\06': - raise lifescan.MalformedCommand( - 'invalid response, expected 04 06, received %02x %02x' % ( - response[0], response[1])) - - (record_count,) = _STRUCT_RECORDID.unpack(response[2:]) - return record_count - - def get_glucose_unit(self): - unit_value = self._read_parameter(_PARAMETER_KEY_UNIT) - if unit_value == b'\x00\x00\x00\x00': - return common.Unit.MG_DL - elif unit_value == b'\x01\x00\x00\x00': - return common.Unit.MMOL_L - else: - raise exceptions.InvalidGlucoseUnit('%r' % unit_value) - - def _get_reading(self, record_number): - request = (_READ_RECORD_REQUEST_PREFIX + - _STRUCT_RECORDID.pack(record_number) + - _READ_RECORD_REQUEST_SUFFIX) - response = self._send_message(request, 3) - if response[0:2] != b'\x04\06': - raise lifescan.MalformedCommand( - 'invalid response, expected 04 06, received %02x %02x' % ( - response[0], response[1])) - - (unused_const1, unused_const2, unused_counter, unused_const3, - unused_counter2, timestamp, value, meal_flag, unused_const4, unused_flags, - unused_const5, unused_const6) = _STRUCT_RECORD.unpack( - response) - - return common.GlucoseReading( - _convert_timestamp(timestamp), float(value), meal=_MEAL_CODES[meal_flag]) - - def get_readings(self): - record_count = self._get_reading_count() - for record_number in range(record_count): - yield self._get_reading(record_number) + def __init__(self, device): + if not device: + raise exceptions.CommandLineError( + '--device parameter is required, should point to the disk ' + 'device representing the meter.') + + self.device_name_ = device + self.scsi_device_ = SCSIDevice(device, readwrite=True) + self.scsi_ = SCSI(self.scsi_device_) + self.scsi_.blocksize = _REGISTER_SIZE + + def connect(self): + inq = self.scsi_.inquiry() + logging.debug('Device connected: %r', inq.result) + vendor = inq.result['t10_vendor_identification'][:32] + if vendor != b'LifeScan': + raise exceptions.ConnectionFailed( + 'Device %s is not a LifeScan glucometer.' % self.device_name_) + + def disconnect(self): + return + + def _send_request(self, lba, request_format, request_obj, response_format): + """Send a request to the meter, and read its response. + + Args: + lba: (int) the address of the block register to use, known + valid addresses are 3, 4 and 5. + request_format: a construct format identifier of the request to send + request_obj: the object to format with the provided identifier + response_format: a construct format identifier to parse the returned + message with. + + Returns: + The Container object parsed from the response received by the meter. + + Raises: + lifescan.MalformedCommand if Construct fails to build the request or + parse the response. + + """ + try: + request = request_format.build(request_obj) + request_raw = _PACKET.build({'value': {'message': request}}) + logging.debug( + 'Request sent: %s', binascii.hexlify(request_raw)) + self.scsi_.write10(lba, 1, request_raw) + + response_raw = self.scsi_.read10(lba, 1) + logging.debug( + 'Response received: %s', binascii.hexlify(response_raw.datain)) + response_pkt = _PACKET.parse(response_raw.datain) + logging.debug('Response packet: %r', response_pkt) + + response = response_format.parse(response_pkt.value.message) + logging.debug('Response parsed: %r', response) + + return response + except construct.ConstructError as e: + raise lifescan.MalformedCommand(str(e)) + + def _query_string(self, selector): + response = self._send_request( + 3, _QUERY_REQUEST, {'selector': selector}, _QUERY_RESPONSE) + + # Unfortunately the CString implementation in construct does not support + # multi-byte encodings, so we need to discard the terminating null byte + # ourself. + return response.value[:-1] + + def get_meter_info(self): + return common.MeterInfo( + 'OneTouch %s glucometer' % self._query_string('model'), + serial_number=self.get_serial_number(), + version_info=( + 'Software version: ' + self.get_version(),), + native_unit=self.get_glucose_unit()) + + def get_serial_number(self): + return self._query_string('serial') + + def get_version(self): + return self._query_string('software') + + def get_datetime(self): + response = self._send_request( + 3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) + return response.timestamp + + def set_datetime(self, date=datetime.datetime.now()): + self._send_request( + 3, _WRITE_RTC_REQUEST, {'timestamp': date}, + _COMMAND_SUCCESS) + + # The device does not return the new datetime, so confirm by calling + # READ RTC again. + return self.get_datetime() + + def zero_log(self): + self._send_request( + 3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) + + def _get_reading_count(self): + response = self._send_request( + 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE) + return response.count + + def get_glucose_unit(self): + response = self._send_request( + 4, _READ_PARAMETER_REQUEST, {'selector': 'unit'}, + _READ_UNIT_RESPONSE) + return response.unit + + def _get_reading(self, record_id): + response = self._send_request( + 3, _READ_RECORD_REQUEST, {'record_id': record_id}, + _READ_RECORD_RESPONSE) + return common.GlucoseReading( + response.timestamp, float(response.value), meal=response.meal) + + def get_readings(self): + record_count = self._get_reading_count() + for record_id in range(record_count): + yield self._get_reading(record_id) diff --git a/setup.py b/setup.py index 3be4b94..212425a 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ setup( # listed as mandatory for the feature. 'otultra2': ['pyserial'], 'otultraeasy': ['construct', 'pyserial'], - 'otverio2015': ['python-scsi'], + 'otverio2015': ['construct', 'python-scsi'], 'fsinsulinx': ['construct', 'hidapi'], 'fslibre': ['construct', 'hidapi'], 'fsoptium': ['pyserial'], -- cgit v1.2.3