# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: © 2014 The glucometerutils Authors
# SPDX-License-Identifier: MIT
"""Driver for LifeScan OneTouch Ultra Easy devices.
Also supports OneTouch Ultra Mini devices (different name, same device).
Supported features:
- get readings;
- use the glucose unit preset on the device by default;
- get and set date and time;
- get serial number and software version;
- memory reset (caution!)
Expected device path: /dev/ttyUSB0 or similar serial port device.
"""
import binascii
import datetime
import logging
from typing import Any, Dict, Generator, Optional
import construct
from glucometerutils import common, driver
from glucometerutils.support import (
construct_extras,
lifescan,
lifescan_binary_protocol,
serial,
)
_PACKET = lifescan_binary_protocol.LifeScanPacket(True)
_INVALID_RECORD = 501
_COMMAND_SUCCESS = construct.Const(b"\x05\x06")
_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02")
_VERSION_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"version" / construct.PascalString(construct.Byte, encoding="ascii"),
)
_SERIAL_NUMBER_REQUEST = construct.Const(
b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
_SERIAL_NUMBER_RESPONSE = construct.Struct(
_COMMAND_SUCCESS, "serial_number" / construct.GreedyString(encoding="ascii"),
)
_DATETIME_REQUEST = construct.Struct(
construct.Const(b"\x05\x20"), # 0x20 is the datetime
"request_type" / construct.Enum(construct.Byte, write=0x01, read=0x02),
"timestamp"
/ construct.Default(
construct_extras.Timestamp(construct.Int32ul), # type: ignore
datetime.datetime(1970, 1, 1, 0, 0),
),
)
_DATETIME_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore
)
_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00")
_GLUCOSE_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A")
_READING_COUNT_RESPONSE = construct.Struct(
construct.Const(b"\x0f"), "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
construct.Const(b"\x05\x1f"), "record_id" / construct.Int16ul,
)
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore
"value" / construct.Int32ul,
)
def _make_packet(
message: bytes,
sequence_number: int,
expect_receive: bool,
acknowledge: bool,
disconnect: bool,
):
return _PACKET.build(
{
"data": {
"value": {
"message": message,
"link_control": {
"sequence_number": sequence_number,
"expect_receive": expect_receive,
"acknowledge": acknowledge,
"disconnect": disconnect,
},
}
}
}
)
class Device(serial.SerialDevice, driver.GlucometerDevice):
BAUDRATE = 9600
DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable.
TIMEOUT = 0.5
def __init__(self, device: Optional[str]) -> None:
super().__init__(device)
self.sent_counter_ = False
self.expect_receive_ = False
self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
def connect(self) -> None:
try:
self._send_packet(b"", disconnect=True)
self._read_ack()
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
def disconnect(self) -> None:
self.connect()
def _send_packet(
self, message: bytes, acknowledge: bool = False, disconnect: bool = False
) -> None:
pkt = _make_packet(
message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect
)
logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
def _read_packet(self) -> construct.Container:
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
logging.debug("received packet: %r", raw_pkt)
# discard the checksum and copy
pkt = raw_pkt.value
if not pkt.link_control.disconnect and (
pkt.link_control.sequence_number != self.expect_receive_
):
raise lifescan.MalformedCommand(
f"at position 2[0b] expected {self.expect_receive_:02x}, received {pkt.link_connect.sequence_count:02x}"
)
return pkt
def _send_ack(self) -> None:
self._send_packet(b"", acknowledge=True, disconnect=False)
def _read_ack(self) -> None:
pkt = self._read_packet()
assert pkt.link_control.acknowledge
def _send_request(
self,
request_format: construct.Struct,
request_obj: Optional[Dict[str, Any]],
response_format: construct.Struct,
) -> construct.Container:
try:
request = request_format.build(request_obj)
self._send_packet(request, acknowledge=False, disconnect=False)
self.sent_counter_ = not self.sent_counter_
self._read_ack()
response_pkt = self._read_packet()
assert not response_pkt.link_control.acknowledge
self.expect_receive_ = not self.expect_receive_
self._send_ack()
return response_format.parse(response_pkt.message)
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
def get_meter_info(self) -> common.MeterInfo:
return common.MeterInfo(
"OneTouch Ultra Easy glucometer",
serial_number=self.get_serial_number(),
version_info=("Software version: " + self.get_version(),),
native_unit=self.get_glucose_unit(),
)
def get_version(self) -> str:
response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
def get_serial_number(self) -> str:
response = self._send_request(
_SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
)
return response.serial_number
def get_datetime(self) -> datetime.datetime:
response = self._send_request(
_DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE
)
return response.timestamp
def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
response = self._send_request(
_DATETIME_REQUEST,
{"request_type": "write", "timestamp": date},
_DATETIME_RESPONSE,
)
return response.timestamp
def zero_log(self) -> None:
self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self) -> common.Unit:
response = self._send_request(
_GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
)
return response.unit
def _get_reading_count(self) -> int:
response = self._send_request(
_READ_RECORD_REQUEST,
{"record_id": _INVALID_RECORD},
_READING_COUNT_RESPONSE,
)
return response.count
def _get_reading(self, record_id: int) -> common.GlucoseReading:
response = self._send_request(
_READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
)
return common.GlucoseReading(response.timestamp, float(response.value))
def get_readings(self) -> Generator[common.AnyReading, None, None]:
record_count = self._get_reading_count()
for record_id in range(record_count):
yield self._get_reading(record_id)