summaryrefslogblamecommitdiffstats
path: root/glucometerutils/drivers/otultraeasy.py
blob: 22798c0806910c79cbb9a1f73161826b43f7a7e2 (plain) (tree)
1
2
3
4
                       
 
                                                             
                              












                                                                        
 
               
               
              
                                                 

                
 
                                          

                                     



                             
 
                                                       


                     
                                               
 
                                                   

                                     

                                                                     


                                         

                                                       

                                           
                                                                                     


                                     


                                                                       
                                                                      

                                            


                                      

                                                                            

 
                                                                            
 

                                          


                                               

 
                                                    

                                           
                                                            


                                        
                                                                    


                                     


                                                                            
 
 
 






                         
                         













                                                           
 
 
                                                           
                   
                                                           
                 
 

                                                      
 

                                    
                                                                              
 
                              
            
                                                   


                                                   
 
                                 
                      
 


                                                                                 
                           


                                                                                      
 

                               
 
                                                  
                                                                       
                                                     
 

                                       
 
                                                

                                                                    
                                            
                                                                                                                        
             
 
                  
 
                                
                                                                  
 
                                
                                 
                                           
 





                                              
            
                                                       
                                                                           
 

                                                       
 
                                              
                                                            
 

                                                           
 
                                                              

                                                   
 
                                                 
                                
                                             
                                                   


                                                                      
 
                                 
                                                                                
 
                               
 
                                       
                                      

                                                                 
 
                                     
 
                                                
                                      

                                                                           
 
                                 
 
                                                                                 
                                      
                              
                                                         

                               
                                 
 
                               
                                                                         
 
                                              
                                      

                                                               
 
                            
 
                                        
                                      



                                           

                             
                                                                    
                                      

                                                                             
 
                                                                               
 
                                                                       


                                                
# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: © 2014 The glucometerutils Authors
# SPDX-License-Identifier: MIT
"""Driver for LifeScan OneTouch Ultra Easy devices.

Also supports OneTouch Ultra Mini devices (different name, same device).

Supported features:
    - get readings;
    - use the glucose unit preset on the device by default;
    - get and set date and time;
    - get serial number and software version;
    - memory reset (caution!)

Expected device path: /dev/ttyUSB0 or similar serial port device.
"""

import binascii
import datetime
import logging
from typing import Any, Dict, Generator, Optional

import construct

from glucometerutils import common, driver
from glucometerutils.support import (
    construct_extras,
    lifescan,
    lifescan_binary_protocol,
    serial,
)

_PACKET = lifescan_binary_protocol.LifeScanPacket(True)

_INVALID_RECORD = 501

_COMMAND_SUCCESS = construct.Const(b"\x05\x06")

_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02")

_VERSION_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS,
    version=construct.PascalString(construct.Byte, encoding="ascii"),
)

_SERIAL_NUMBER_REQUEST = construct.Const(
    b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)

_SERIAL_NUMBER_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS, serial_number=construct.GreedyString(encoding="ascii"),
)

_DATETIME_REQUEST = construct.Struct(
    const=construct.Const(b"\x05\x20"),  # 0x20 is the datetime
    request_type=construct.Enum(construct.Byte, write=0x01, read=0x02),
    timestamp=construct.Default(
        construct_extras.Timestamp(construct.Int32ul),  # type: ignore
        datetime.datetime(1970, 1, 1, 0, 0),
    ),
)

_DATETIME_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS,
    timestamp=construct_extras.Timestamp(construct.Int32ul),  # type: ignore
)

_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00")


_GLUCOSE_UNIT_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS,
    unit=lifescan_binary_protocol.GLUCOSE_UNIT,
    padding=construct.Padding(3),
)

_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A")

_READING_COUNT_RESPONSE = construct.Struct(
    const=construct.Const(b"\x0f"), count=construct.Int16ul,
)

_READ_RECORD_REQUEST = construct.Struct(
    const=construct.Const(b"\x05\x1f"), record_id=construct.Int16ul,
)

_READING_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS,
    timestamp=construct_extras.Timestamp(construct.Int32ul),  # type: ignore
    value=construct.Int32ul,
)


def _make_packet(
    message: bytes,
    sequence_number: int,
    expect_receive: bool,
    acknowledge: bool,
    disconnect: bool,
):
    return _PACKET.build(
        {
            "data": {
                "value": {
                    "message": message,
                    "link_control": {
                        "sequence_number": sequence_number,
                        "expect_receive": expect_receive,
                        "acknowledge": acknowledge,
                        "disconnect": disconnect,
                    },
                }
            }
        }
    )


class Device(serial.SerialDevice, driver.GlucometerDevice):
    BAUDRATE = 9600
    DEFAULT_CABLE_ID = "067b:2303"  # Generic PL2303 cable.
    TIMEOUT = 0.5

    def __init__(self, device: Optional[str]) -> None:
        super().__init__(device)

        self.sent_counter_ = False
        self.expect_receive_ = False
        self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)

    def connect(self) -> None:
        try:
            self._send_packet(b"", disconnect=True)
            self._read_ack()
        except construct.ConstructError as e:
            raise lifescan.MalformedCommand(str(e))

    def disconnect(self) -> None:
        self.connect()

    def _send_packet(
        self, message: bytes, acknowledge: bool = False, disconnect: bool = False
    ) -> None:
        pkt = _make_packet(
            message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect
        )
        logging.debug("sending packet: %s", binascii.hexlify(pkt))

        self.serial_.write(pkt)
        self.serial_.flush()

    def _read_packet(self) -> construct.Container:
        raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
        logging.debug("received packet: %r", raw_pkt)

        # discard the checksum and copy
        pkt = raw_pkt.value

        if not pkt.link_control.disconnect and (
            pkt.link_control.sequence_number != self.expect_receive_
        ):
            raise lifescan.MalformedCommand(
                f"at position 2[0b] expected {self.expect_receive_:02x}, received {pkt.link_connect.sequence_count:02x}"
            )

        return pkt

    def _send_ack(self) -> None:
        self._send_packet(b"", acknowledge=True, disconnect=False)

    def _read_ack(self) -> None:
        pkt = self._read_packet()
        assert pkt.link_control.acknowledge

    def _send_request(
        self,
        request_format: construct.Struct,
        request_obj: Optional[Dict[str, Any]],
        response_format: construct.Struct,
    ) -> construct.Container:
        try:
            request = request_format.build(request_obj)
            self._send_packet(request, acknowledge=False, disconnect=False)

            self.sent_counter_ = not self.sent_counter_
            self._read_ack()

            response_pkt = self._read_packet()
            assert not response_pkt.link_control.acknowledge

            self.expect_receive_ = not self.expect_receive_
            self._send_ack()

            return response_format.parse(response_pkt.message)
        except construct.ConstructError as e:
            raise lifescan.MalformedCommand(str(e))

    def get_meter_info(self) -> common.MeterInfo:
        return common.MeterInfo(
            "OneTouch Ultra Easy glucometer",
            serial_number=self.get_serial_number(),
            version_info=("Software version: " + self.get_version(),),
            native_unit=self.get_glucose_unit(),
        )

    def get_version(self) -> str:
        response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)

        return response.version

    def get_serial_number(self) -> str:
        response = self._send_request(
            _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
        )

        return response.serial_number

    def get_datetime(self) -> datetime.datetime:
        response = self._send_request(
            _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE
        )

        return response.timestamp

    def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
        response = self._send_request(
            _DATETIME_REQUEST,
            {"request_type": "write", "timestamp": date},
            _DATETIME_RESPONSE,
        )
        return response.timestamp

    def zero_log(self) -> None:
        self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)

    def get_glucose_unit(self) -> common.Unit:
        response = self._send_request(
            _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
        )

        return response.unit

    def _get_reading_count(self) -> int:
        response = self._send_request(
            _READ_RECORD_REQUEST,
            {"record_id": _INVALID_RECORD},
            _READING_COUNT_RESPONSE,
        )
        return response.count

    def _get_reading(self, record_id: int) -> common.GlucoseReading:
        response = self._send_request(
            _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
        )

        return common.GlucoseReading(response.timestamp, float(response.value))

    def get_readings(self) -> Generator[common.AnyReading, None, None]:
        record_count = self._get_reading_count()
        for record_id in range(record_count):
            yield self._get_reading(record_id)