summaryrefslogblamecommitdiffstats
path: root/glucometerutils/support/contourusb.py
blob: 1b2f802f4ed564182cfb78c7ff71fa7d34261bce (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14

                       
                                                             










                                                               
               
           
         
                                                         
 
                                                          




















                                                                              

                                                 




                                                                             

                          

                            




                                                      



                            
 









                            
                                                     

                                                                  
 
                  
 


                                  
 
                                                                                     


                                                                      



                                     
                                           
                       

                                               
                                             

                     
                               

                          
                                                               
                                               
                                    
 
                                     
 

                                                                



                                               

















                                                          

                 


                                                

                 







                                                            

                 







                                                            

                 
                                            
 

                                                
                                                
                                                





                                                                   

                                                                             
 
                                                 







                                                                   
                                         






                                      
                             
                                                                             

             


                                                                 
                             
                                                                                               

                      

                                               
                                  

                      
                                                     





                                          
                                   

                       
                                  



                                                            
                                                    
                                 
                                                                                        



















                                                                            
                                  


                                                                          
                                        


                                                      
                                       


                                              
                                                






                                               

              
 
                                                 




                                                                   
                                   
            
                            



                                   
                                                                  




                                          
                                                







                                                  
                                        

                                            
                                           

                                            
                                                    
                             
                                       



                                                            
                                        
                                              
                                      
                                                                 

                                                                 
                                    


                              
                                                               
                                               
                                 


                                     
                                                       







                                                                               
                             


                                                                               
 

                                            
                                                 
# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: © 2019 The glucometerutils Authors
# SPDX-License-Identifier: MIT
"""Common routines to implement the ContourUSB common protocol.

Protocol documentation available from Ascensia at
http://protocols.ascensia.com/Programming-Guide.aspx

* glucodump code segments are developed by Anders Hammarquist
* Rest of code is developed by Arvanitis Christos

"""

import datetime
import enum
import re
from typing import Dict, Generator, List, Optional, Tuple

from glucometerutils.support import driver_base, hiddevice

# regexr.com/4k6jb
_HEADER_RECORD_RE = re.compile(
    "^(?P<record_type>[a-zA-Z])\\|(?P<field_del>.)(?P<repeat_del>.)"
    "(?P<component_del>.)(?P<escape_del>.)\\|\\w*\\|(?P<product_code>\\w+)"
    "\\^(?P<dig_ver>[0-9]{2}\\.[0-9]{2})\\\\(?P<anlg_ver>[0-9]{2}\\.[0-9]{2})"
    "\\\\(?P<agp_ver>[0-9]{2}\\.[0-9]{2}\\.[0-9]{2})\\"
    "^(?P<serial_num>(\\w|-)+)\\^(?P<sku_id>(\\w|-)+)\\|"
    "A=(?P<res_marking>[0-9])\\^C=(?P<config_bits>[0-9]+)\\"
    "^G=(?P<lang>[0-9]+)\\^I=(?P<interv>[0-9]+)\\^R=(?P<ref_method>[0-9]+)\\"
    "^S=(?P<internal>[0-9]+)\\^U=(?P<unit>[0-9]+)\\"
    "^V=(?P<lo_bound>[0-9]{2})(?P<hi_bound>[0-9]{3})\\"
    "^X=(?P<hypo_limit>[0-9]{3})(?P<overall_low>[0-9]{3})"
    "(?P<pre_food_low>[0-9]{3})(?P<post_food_low>[0-9]{3})"
    "(?P<overall_high>[0-9]{3})(?P<pre_food_high>[0-9]{3})"
    "(?P<post_food_high>[0-9]{3})(?P<hyper_limit>[0-9]{3})\\"
    "^Y=(?P<upp_hyper>[0-9]{3})(?P<low_hyper>[0-9]{3})"
    "(?P<upp_hypo>[0-9]{3})(?P<low_hypo>[0-9]{3})(?P<upp_low_target>[0-9]{3})"
    "(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})"
    "(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|"
    "(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|"
    "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)"
)

_RESULT_RECORD_RE = re.compile(
    "^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\"
    "^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^"
    "(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|"
    "(?P<datetime>[0-9]+)"
)

_RECORD_FORMAT = re.compile(
    "\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)"
    "\x0d(?P<end>[\x03\x17]))"
    "(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a"
)


class FrameError(Exception):
    pass


@enum.unique
class Mode(enum.Enum):
    """Operation modes."""

    ESTABLISH = enum.auto()
    DATA = enum.auto()
    PRECOMMAND = enum.auto()
    COMMAND = enum.auto()


class ContourHidDevice(driver_base.GlucometerDriver):
    """Base class implementing the ContourUSB HID common protocol.
    """

    blocksize = 64

    state: Optional[Mode] = None

    currecno: Optional[int] = None

    def __init__(self, usb_ids: Tuple[int, int], device_path: Optional[str]) -> None:
        super().__init__(device_path)
        self._hid_session = hiddevice.HidSession(usb_ids, device_path)

    def read(self, r_size=blocksize):
        result = []

        while True:
            data = self._hid_session.read()
            dstr = data
            data_end_idx = data[3] + 4
            result.append(dstr[4:data_end_idx])
            if data[3] != self.blocksize - 4:
                break

        return b"".join(result)

    def write(self, data):
        data = b"ABC" + chr(len(data)).encode() + data.encode()
        pad_length = self.blocksize - len(data)
        data += pad_length * b"\x00"

        self._hid_session.write(data)

    USB_VENDOR_ID: int = 0x1A79  # Bayer Health Care LLC Contour
    USB_PRODUCT_ID: int = 0x6002

    def parse_header_record(self, text):
        header = _HEADER_RECORD_RE.search(text)

        self.field_del = header.group("field_del")
        self.repeat_del = header.group("repeat_del")
        self.component_del = header.group("component_del")
        self.escape_del = header.group("escape_del")

        self.product_code = header.group("product_code")
        self.dig_ver = header.group("dig_ver")
        self.anlg_ver = header.group("anlg_ver")
        self.agp_ver = header.group("agp_ver")

        self.serial_num = header.group("serial_num")
        self.sku_id = header.group("sku_id")
        self.res_marking = header.group("res_marking")
        self.config_bits = header.group("config_bits")
        self.lang = header.group("lang")
        self.interv = header.group("interv")
        self.ref_method = header.group("ref_method")
        self.internal = header.group("internal")

        # U limit
        self.unit = header.group("unit")
        self.lo_bound = header.group("lo_bound")
        self.hi_bound = header.group("hi_bound")

        # X field
        self.hypo_limit = header.group("hypo_limit")
        self.overall_low = header.group("overall_low")
        self.pre_food_low = header.group("pre_food_low")
        self.post_food_low = header.group("post_food_low")
        self.overall_high = header.group("overall_high")
        self.pre_food_high = header.group("pre_food_high")
        self.post_food_high = header.group("post_food_high")
        self.hyper_limit = header.group("hyper_limit")

        # Y field
        self.upp_hyper = header.group("upp_hyper")
        self.low_hyper = header.group("low_hyper")
        self.upp_hypo = header.group("upp_hypo")
        self.low_hypo = header.group("low_hypo")
        self.upp_low_target = header.group("upp_low_target")
        self.low_low_target = header.group("low_low_target")
        self.upp_hi_target = header.group("upp_hi_target")
        self.low_hi_target = header.group("low_hi_target")

        # Z field
        self.trends = header.group("trends")

        self.total = header.group("total")
        self.spec_ver = header.group("spec_ver")
        # Datetime string in YYYYMMDDHHMM format
        self.datetime = header.group("datetime")

    def checksum(self, text):
        """
        Implemented by Anders Hammarquist for glucodump project
        More info: https://bitbucket.org/iko/glucodump/src/default/
        """
        checksum = hex(sum(ord(c) for c in text) % 256).upper().split("X")[1]
        return ("00" + checksum)[-2:]

    def checkframe(self, frame) -> Optional[str]:
        """
        Implemented by Anders Hammarquist for glucodump project
        More info: https://bitbucket.org/iko/glucodump/src/default/
        """
        match = _RECORD_FORMAT.match(frame)
        if not match:
            raise FrameError("Couldn't parse frame", frame)

        recno = int(match.group("recno"))
        if self.currecno is None:
            self.currecno = recno

        if recno + 1 == self.currecno:
            return None

        if recno != self.currecno:
            raise FrameError(
                f"Bad recno, got {recno!r} expected {self.currecno!r}", frame
            )

        calculated_checksum = self.checksum(match.group("check"))
        received_checksum = match.group("checksum")
        if calculated_checksum != received_checksum:
            raise FrameError(
                f"Checksum error: received {received_checksum} expected {calculated_checksum}",
                frame,
            )

        self.currecno = (self.currecno + 1) % 8
        return match.group("text")

    def connect(self):
        """Connecting the device, nothing to be done.
        All process is hadled by hiddevice
        """
        pass

    def _get_info_record(self):
        self.currecno = None
        self.state = Mode.ESTABLISH
        try:
            while True:
                self.write("\x04")
                res = self.read()
                if res[0] == 4 and res[-1] == 5:
                    # we are connected and just got a header
                    header_record = res.decode()
                    stx = header_record.find("\x02")
                    if stx != -1:
                        result = _RECORD_FORMAT.match(header_record[stx:]).group("text")
                        self.parse_header_record(result)
                    break
                else:
                    pass

        except FrameError as e:
            print("Frame error")
            raise e

        except Exception as e:
            print("Uknown error occured")
            raise e

    def disconnect(self):
        """Disconnect the device, nothing to be done."""
        pass

    # Some of the commands are also shared across devices that use this HID
    # protocol, but not many. Only provide here those that do seep to change
    # between them.
    def _get_version(self) -> str:
        """Return the software version of the device."""
        return self.dig_ver + " - " + self.anlg_ver + " - " + self.agp_ver

    def _get_serial_number(self) -> str:
        """Returns the serial number of the device."""
        return self.serial_num

    def _get_glucose_unit(self) -> str:
        """Return 0 for mg/dL, 1 for mmol/L"""
        return self.unit

    def get_datetime(self) -> datetime.datetime:
        datetime_str = self.datetime
        return datetime.datetime(
            int(datetime_str[0:4]),  # year
            int(datetime_str[4:6]),  # month
            int(datetime_str[6:8]),  # day
            int(datetime_str[8:10]),  # hour
            int(datetime_str[10:12]),  # minute
            0,
        )

    def sync(self) -> Generator[str, None, None]:
        """
        Sync with meter and yield received data frames
        FSM implemented by Anders Hammarquist's for glucodump
        More info: https://bitbucket.org/iko/glucodump/src/default/
        """
        self.state = Mode.ESTABLISH
        try:
            tometer = "\x04"
            result = None
            foo = 0
            while True:
                self.write(tometer)
                if result is not None and self.state == Mode.DATA:
                    yield result
                result = None
                data_bytes = self.read()
                data = data_bytes.decode()

                if self.state == Mode.ESTABLISH:
                    if data_bytes[-1] == 15:
                        # got a <NAK>, send <EOT>
                        tometer = chr(foo)
                        foo += 1
                        foo %= 256
                        continue
                    if data_bytes[-1] == 5:
                        # got an <ENQ>, send <ACK>
                        tometer = "\x06"
                        self.currecno = None
                        continue
                if self.state == Mode.DATA:
                    if data_bytes[-1] == 4:
                        # got an <EOT>, done
                        self.state = Mode.PRECOMMAND
                        break
                stx = data.find("\x02")
                if stx != -1:
                    # got <STX>, parse frame
                    try:
                        result = self.checkframe(data[stx:])
                        tometer = "\x06"
                        self.state = Mode.DATA
                    except FrameError:
                        tometer = "\x15"  # Couldn't parse, <NAK>
                else:
                    # Got something we don't understand, <NAK> it
                    tometer = "\x15"
        except Exception as e:
            raise e

    def parse_result_record(self, text: str) -> Dict[str, str]:
        result = _RESULT_RECORD_RE.search(text)
        assert result is not None
        rec_text = result.groupdict()
        return rec_text

    def _get_multirecord(self) -> List[Dict[str, str]]:
        """Queries for, and returns, "multirecords" results.

        Returns:
          (csv.reader): a CSV reader object that returns a record for each line
             in the record file.
        """
        records_arr = []
        for rec in self.sync():
            if rec[0] == "R":
                # parse using result record regular expression
                rec_text = self.parse_result_record(rec)
                # get dictionary to use in main driver module without import re

                records_arr.append(rec_text)
        # return csv.reader(records_arr)
        return records_arr  # array of groupdicts