diff options
Diffstat (limited to 'glucometerutils/drivers/otverio2015.py')
-rw-r--r-- | glucometerutils/drivers/otverio2015.py | 123 |
1 files changed, 57 insertions, 66 deletions
diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py index bde0af3..b8429ea 100644 --- a/glucometerutils/drivers/otverio2015.py +++ b/glucometerutils/drivers/otverio2015.py @@ -29,67 +29,61 @@ import construct from pyscsi.pyscsi.scsi import SCSI from pyscsi.pyscsi.scsi_device import SCSIDevice -from glucometerutils import common -from glucometerutils import exceptions +from glucometerutils import common, exceptions from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol # This device uses SCSI blocks as registers. _REGISTER_SIZE = 512 _PACKET = construct.Padded( - _REGISTER_SIZE, - lifescan_binary_protocol.LifeScanPacket(False)) + _REGISTER_SIZE, lifescan_binary_protocol.LifeScanPacket(False) +) -_COMMAND_SUCCESS = construct.Const(b'\x03\x06') +_COMMAND_SUCCESS = construct.Const(b"\x03\x06") _QUERY_REQUEST = construct.Struct( - construct.Const(b'\x03\xe6\x02'), - 'selector' / construct.Enum( - construct.Byte, serial=0x00, model=0x01, software=0x02), + construct.Const(b"\x03\xe6\x02"), + "selector" / construct.Enum(construct.Byte, serial=0x00, model=0x01, software=0x02), ) _QUERY_RESPONSE = construct.Struct( - construct.Const(b'\x03\x06'), - 'value' / construct.CString(encoding='utf-16-le'), + construct.Const(b"\x03\x06"), "value" / construct.CString(encoding="utf-16-le"), ) _READ_PARAMETER_REQUEST = construct.Struct( - construct.Const(b'\x03'), - 'selector' / construct.Enum( - construct.Byte, unit=0x04), + construct.Const(b"\x03"), "selector" / construct.Enum(construct.Byte, unit=0x04), ) _READ_UNIT_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT, + "unit" / lifescan_binary_protocol.GLUCOSE_UNIT, construct.Padding(3), ) -_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02') +_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02") _READ_RTC_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) _WRITE_RTC_REQUEST = construct.Struct( - construct.Const(b'\x03\x20\x01'), - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + construct.Const(b"\x03\x20\x01"), + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) -_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a') +_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a") -_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00') +_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00") _READ_RECORD_COUNT_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'count' / construct.Int16ul, + _COMMAND_SUCCESS, "count" / construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( - construct.Const(b'\x03\x31\x02'), - 'record_id' / construct.Int16ul, - construct.Const(b'\x00'), + construct.Const(b"\x03\x31\x02"), + "record_id" / construct.Int16ul, + construct.Const(b"\x00"), ) _MEAL_FLAG = { @@ -100,13 +94,12 @@ _MEAL_FLAG = { _READ_RECORD_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'inverse_counter' / construct.Int16ul, + "inverse_counter" / construct.Int16ul, construct.Padding(1), - 'lifetime_counter' / construct.Int16ul, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore - 'value' / construct.Int16ul, - 'meal' / construct.Mapping( - construct.Byte, _MEAL_FLAG), + "lifetime_counter" / construct.Int16ul, + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "value" / construct.Int16ul, + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), construct.Padding(4), ) @@ -115,8 +108,9 @@ class Device(driver_base.GlucometerDriver): def __init__(self, device): if not device: raise exceptions.CommandLineError( - '--device parameter is required, should point to the disk ' - 'device representing the meter.') + "--device parameter is required, should point to the disk " + "device representing the meter." + ) self.device_name_ = device self.scsi_device_ = SCSIDevice(device, readwrite=True) @@ -125,11 +119,12 @@ class Device(driver_base.GlucometerDriver): def connect(self): inq = self.scsi_.inquiry() - logging.debug('Device connected: %r', inq.result) - vendor = inq.result['t10_vendor_identification'][:32] - if vendor != b'LifeScan': + logging.debug("Device connected: %r", inq.result) + vendor = inq.result["t10_vendor_identification"][:32] + if vendor != b"LifeScan": raise exceptions.ConnectionFailed( - 'Device %s is not a LifeScan glucometer.' % self.device_name_) + "Device %s is not a LifeScan glucometer." % self.device_name_ + ) def disconnect(self): # pylint: disable=no-self-use return @@ -155,21 +150,19 @@ class Device(driver_base.GlucometerDriver): """ try: request = request_format.build(request_obj) - request_raw = _PACKET.build({'data': {'value': { - 'message': request, - }}}) - logging.debug( - 'Request sent: %s', binascii.hexlify(request_raw)) + request_raw = _PACKET.build({"data": {"value": {"message": request,}}}) + logging.debug("Request sent: %s", binascii.hexlify(request_raw)) self.scsi_.write10(lba, 1, request_raw) response_raw = self.scsi_.read10(lba, 1) logging.debug( - 'Response received: %s', binascii.hexlify(response_raw.datain)) + "Response received: %s", binascii.hexlify(response_raw.datain) + ) response_pkt = _PACKET.parse(response_raw.datain).data - logging.debug('Response packet: %r', response_pkt) + logging.debug("Response packet: %r", response_pkt) response = response_format.parse(response_pkt.value.message) - logging.debug('Response parsed: %r', response) + logging.debug("Response parsed: %r", response) return response except construct.ConstructError as e: @@ -177,60 +170,58 @@ class Device(driver_base.GlucometerDriver): def _query_string(self, selector): response = self._send_request( - 3, _QUERY_REQUEST, {'selector': selector}, _QUERY_RESPONSE) + 3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE + ) return response.value def get_meter_info(self): return common.MeterInfo( - 'OneTouch %s glucometer' % self._query_string('model'), + "OneTouch %s glucometer" % self._query_string("model"), serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_serial_number(self): - return self._query_string('serial') + return self._query_string("serial") def get_version(self): - return self._query_string('software') + return self._query_string("software") def get_datetime(self): - response = self._send_request( - 3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) + response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp def _set_device_datetime(self, date): - self._send_request( - 3, _WRITE_RTC_REQUEST, {'timestamp': date}, - _COMMAND_SUCCESS) + self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS) # The device does not return the new datetime, so confirm by calling # READ RTC again. return self.get_datetime() def zero_log(self): - self._send_request( - 3, _MEMORY_ERASE_REQUEST, None, - _COMMAND_SUCCESS) + self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self): response = self._send_request( - 4, _READ_PARAMETER_REQUEST, {'selector': 'unit'}, - _READ_UNIT_RESPONSE) + 4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE + ) return response.unit def _get_reading_count(self): response = self._send_request( - 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE) + 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE + ) return response.count def _get_reading(self, record_id): response = self._send_request( - 3, _READ_RECORD_REQUEST, {'record_id': record_id}, - _READ_RECORD_RESPONSE) + 3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE + ) return common.GlucoseReading( - response.timestamp, float(response.value), meal=response.meal) + response.timestamp, float(response.value), meal=response.meal + ) def get_readings(self): record_count = self._get_reading_count() |