summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers/otultraeasy.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/drivers/otultraeasy.py')
-rw-r--r--glucometerutils/drivers/otultraeasy.py149
1 files changed, 79 insertions, 70 deletions
diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py
index 7f4934e..0d1e7a9 100644
--- a/glucometerutils/drivers/otultraeasy.py
+++ b/glucometerutils/drivers/otultraeasy.py
@@ -22,87 +22,97 @@ import logging
import construct
from glucometerutils import common
-from glucometerutils.support import construct_extras, driver_base, lifescan, lifescan_binary_protocol, serial
+from glucometerutils.support import (
+ construct_extras,
+ driver_base,
+ lifescan,
+ lifescan_binary_protocol,
+ serial,
+)
_PACKET = lifescan_binary_protocol.LifeScanPacket(True)
_INVALID_RECORD = 501
-_COMMAND_SUCCESS = construct.Const(b'\x05\x06')
+_COMMAND_SUCCESS = construct.Const(b"\x05\x06")
-_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02')
+_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02")
_VERSION_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'version' / construct.PascalString(construct.Byte, encoding='ascii'),
+ "version" / construct.PascalString(construct.Byte, encoding="ascii"),
)
_SERIAL_NUMBER_REQUEST = construct.Const(
- b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00')
+ b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+)
_SERIAL_NUMBER_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'serial_number' / construct.GreedyString(encoding='ascii'),
+ _COMMAND_SUCCESS, "serial_number" / construct.GreedyString(encoding="ascii"),
)
_DATETIME_REQUEST = construct.Struct(
- construct.Const(b'\x05\x20'), # 0x20 is the datetime
- 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02),
- 'timestamp' / construct.Default(
+ construct.Const(b"\x05\x20"), # 0x20 is the datetime
+ "request_type" / construct.Enum(construct.Byte, write=0x01, read=0x02),
+ "timestamp"
+ / construct.Default(
construct_extras.Timestamp(construct.Int32ul), # type: ignore
- datetime.datetime(1970, 1, 1, 0, 0)),
+ datetime.datetime(1970, 1, 1, 0, 0),
+ ),
)
_DATETIME_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore
+ "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore
)
-_GLUCOSE_UNIT_REQUEST = construct.Const(
- b'\x05\x09\x02\x09\x00\x00\x00\x00')
+_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00")
_GLUCOSE_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT,
+ "unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
-_MEMORY_ERASE_REQUEST = construct.Const(b'\x05\x1A')
+_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A")
_READING_COUNT_RESPONSE = construct.Struct(
- construct.Const(b'\x0f'),
- 'count' / construct.Int16ul,
+ construct.Const(b"\x0f"), "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
- construct.Const(b'\x05\x1f'),
- 'record_id' / construct.Int16ul,
+ construct.Const(b"\x05\x1f"), "record_id" / construct.Int16ul,
)
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore
- 'value' / construct.Int32ul,
+ "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore
+ "value" / construct.Int32ul,
)
-def _make_packet(
- message, sequence_number, expect_receive, acknowledge, disconnect):
+
+def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect):
return _PACKET.build(
- {'data': {'value': {
- 'message': message,
- 'link_control': {
- 'sequence_number': sequence_number,
- 'expect_receive': expect_receive,
- 'acknowledge': acknowledge,
- 'disconnect': disconnect,
- },
- }}})
+ {
+ "data": {
+ "value": {
+ "message": message,
+ "link_control": {
+ "sequence_number": sequence_number,
+ "expect_receive": expect_receive,
+ "acknowledge": acknowledge,
+ "disconnect": disconnect,
+ },
+ }
+ }
+ }
+ )
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 9600
- DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable.
+ DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable.
TIMEOUT = 0.5
def __init__(self, device):
@@ -110,12 +120,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
self.sent_counter_ = False
self.expect_receive_ = False
- self.buffered_reader_ = construct.Rebuffered(
- _PACKET, tailcutoff=1024)
+ self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
def connect(self):
try:
- self._send_packet(b'', disconnect=True)
+ self._send_packet(b"", disconnect=True)
self._read_ack()
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
@@ -125,33 +134,32 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def _send_packet(self, message, acknowledge=False, disconnect=False):
pkt = _make_packet(
- message,
- self.sent_counter_,
- self.expect_receive_,
- acknowledge,
- disconnect)
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect
+ )
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
def _read_packet(self):
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
- logging.debug('received packet: %r', raw_pkt)
+ logging.debug("received packet: %r", raw_pkt)
# discard the checksum and copy
pkt = raw_pkt.value
if not pkt.link_control.disconnect and (
- pkt.link_control.sequence_number != self.expect_receive_):
+ pkt.link_control.sequence_number != self.expect_receive_
+ ):
raise lifescan.MalformedCommand(
- 'at position 2[0b] expected %02x, received %02x' % (
- self.expect_receive_, pkt.link_connect.sequence_count))
+ "at position 2[0b] expected %02x, received %02x"
+ % (self.expect_receive_, pkt.link_connect.sequence_count)
+ )
return pkt
def _send_ack(self):
- self._send_packet(b'', acknowledge=True, disconnect=False)
+ self._send_packet(b"", acknowledge=True, disconnect=False)
def _read_ack(self):
pkt = self._read_packet()
@@ -177,62 +185,63 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def get_meter_info(self):
return common.MeterInfo(
- 'OneTouch Ultra Easy glucometer',
+ "OneTouch Ultra Easy glucometer",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_version(self):
- response = self._send_request(
- _VERSION_REQUEST, None, _VERSION_RESPONSE)
+ response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
def get_serial_number(self):
response = self._send_request(
- _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE)
+ _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
+ )
return response.serial_number
def get_datetime(self):
response = self._send_request(
- _DATETIME_REQUEST, {'request_type': 'read'},
- _DATETIME_RESPONSE)
+ _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE
+ )
return response.timestamp
def _set_device_datetime(self, date):
response = self._send_request(
- _DATETIME_REQUEST, {
- 'request_type': 'write',
- 'timestamp': date,
- }, _DATETIME_RESPONSE)
+ _DATETIME_REQUEST,
+ {"request_type": "write", "timestamp": date,},
+ _DATETIME_RESPONSE,
+ )
return response.timestamp
def zero_log(self):
- self._send_request(
- _MEMORY_ERASE_REQUEST, None,
- _COMMAND_SUCCESS)
+ self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self):
response = self._send_request(
- _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE)
+ _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
+ )
return response.unit
def _get_reading_count(self):
response = self._send_request(
- _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD},
- _READING_COUNT_RESPONSE)
+ _READ_RECORD_REQUEST,
+ {"record_id": _INVALID_RECORD},
+ _READING_COUNT_RESPONSE,
+ )
return response.count
def _get_reading(self, record_id):
response = self._send_request(
- _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE)
+ _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
+ )
- return common.GlucoseReading(
- response.timestamp, float(response.value))
+ return common.GlucoseReading(response.timestamp, float(response.value))
def get_readings(self):
record_count = self._get_reading_count()