# -*- coding: utf-8 -*- # # SPDX-FileCopyrightText: © 2019 The glucometerutils Authors # SPDX-License-Identifier: MIT """Driver for TaiDoc TD-4277 devices. Supported features: - get readings, including pre-/post-meal notes; - get and set date and time; - get serial number (partial); - memory reset (caution!) Expected device path: 0001:001c:00 (libusb), /dev/hidraw1 (Linux). """ import binascii import datetime import enum import functools import logging import operator from typing import Generator, NoReturn, Optional, Tuple import construct from glucometerutils import common, driver, exceptions from glucometerutils.support import serial class Direction(enum.Enum): In = 0xA5 Out = 0xA3 def byte_checksum(data): return functools.reduce(operator.add, data) & 0xFF _PACKET = construct.Struct( "data" / construct.RawCopy( construct.Struct( construct.Const(b"\x51"), "command" / construct.Byte, "message" / construct.Bytes(4), "direction" / construct.Mapping(construct.Byte, {e: e.value for e in Direction}), ), ), "checksum" / construct.Checksum(construct.Byte, byte_checksum, construct.this.data.data), ) _EMPTY_MESSAGE = b"\x00\x00\x00\x00" _CONNECT_REQUEST = 0x22 _VALID_CONNECT_RESPONSE = {0x22, 0x24, 0x54} _GET_DATETIME = 0x23 _SET_DATETIME = 0x33 _GET_MODEL = 0x24 _GET_READING_COUNT = 0x2B _GET_READING_DATETIME = 0x25 _GET_READING_VALUE = 0x26 _CLEAR_MEMORY = 0x52 _MODEL_STRUCT = construct.Struct( construct.Const(b"\x77\x42"), construct.Byte, construct.Byte, ) _DATETIME_STRUCT = construct.Struct( "day" / construct.Int16ul, "minute" / construct.Byte, "hour" / construct.Byte, ) _DAY_BITSTRUCT = construct.BitStruct( "year" / construct.BitsInteger(7), "month" / construct.BitsInteger(4), "day" / construct.BitsInteger(5), ) _READING_COUNT_STRUCT = construct.Struct( "count" / construct.Int16ul, construct.Int16ul, ) _READING_SELECTION_STRUCT = construct.Struct( "record_id" / construct.Int16ul, construct.Const(b"\x00\x00"), ) _MEAL_FLAG = { common.Meal.NONE: 0x00, common.Meal.BEFORE: 0x40, common.Meal.AFTER: 0x80, } _READING_VALUE_STRUCT = construct.Struct( "value" / construct.Int16ul, construct.Const(b"\x06"), "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), ) def _make_packet( command: int, message: bytes, direction: Direction = Direction.Out ) -> bytes: return _PACKET.build( { "data": { "value": { "command": command, "message": message, "direction": direction, }, } } ) def _parse_datetime(message: bytes) -> datetime.datetime: date = _DATETIME_STRUCT.parse(message) # We can't parse the day properly with a single pass of Construct # unfortunately. day = _DAY_BITSTRUCT.parse(construct.Int16ub.build(date.day)) return datetime.datetime( 2000 + day.year, day.month, day.day, date.hour, date.minute ) def _select_record(record_id: int) -> bytes: return _READING_SELECTION_STRUCT.build({"record_id": record_id}) class Device(serial.SerialDevice, driver.GlucometerDevice): BAUDRATE = 19200 TIMEOUT = 0.5 def __init__(self, device: Optional[str]): super().__init__(f"cp2110://{device}") self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) def _send_command( self, command: int, message: bytes = _EMPTY_MESSAGE, validate_response: bool = True, ) -> Tuple[int, bytes]: pkt = _make_packet(command, message) logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() response = self.buffered_reader_.parse_stream(self.serial_) logging.debug("received packet: %r", response) if validate_response and response.data.value.command != command: raise exceptions.InvalidResponse(response) return response.data.value.command, response.data.value.message def connect(self) -> None: response_command, message = self._send_command( _CONNECT_REQUEST, validate_response=False ) if response_command not in _VALID_CONNECT_RESPONSE: raise exceptions.ConnectionFailed( f"Invalid response received: {response_command:02x} {message!r}" ) _, model_message = self._send_command(_GET_MODEL) try: _MODEL_STRUCT.parse(model_message) except construct.ConstructError: raise exceptions.ConnectionFailed( f"Invalid model identified: {model_message!r}" ) def disconnect(self) -> None: pass def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo("TaiDoc TD-4277 glucometer") def get_version(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError def get_serial_number(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError def get_datetime(self) -> datetime.datetime: _, message = self._send_command(_GET_DATETIME) return _parse_datetime(message) def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: assert date.year >= 2000 day_struct = _DAY_BITSTRUCT.build( {"year": date.year - 2000, "month": date.month, "day": date.day} ) day_word = construct.Int16ub.parse(day_struct) date_message = _DATETIME_STRUCT.build( {"day": day_word, "minute": date.minute, "hour": date.hour} ) _, message = self._send_command(_SET_DATETIME, message=date_message) return _parse_datetime(message) def _get_reading_count(self) -> int: _, message = self._send_command(_GET_READING_COUNT) return _READING_COUNT_STRUCT.parse(message).count def _get_reading(self, record_id: int) -> common.GlucoseReading: _, reading_date_message = self._send_command( _GET_READING_DATETIME, _select_record(record_id) ) reading_date = _parse_datetime(reading_date_message) _, reading_value_message = self._send_command( _GET_READING_VALUE, _select_record(record_id) ) reading_value = _READING_VALUE_STRUCT.parse(reading_value_message) return common.GlucoseReading( reading_date, reading_value.value, meal=reading_value.meal ) def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id) def zero_log(self) -> None: self._send_command(_CLEAR_MEMORY) def get_glucose_unit(self) -> NoReturn: """Maybe this could be implemented by someone who knows the device""" raise NotImplementedError