# -*- coding: utf-8 -*- # # SPDX-FileCopyrightText: © 2019 The glucometerutils Authors # SPDX-License-Identifier: MIT """Common routines to implement the ContourUSB common protocol. Protocol documentation available from Ascensia at http://protocols.ascensia.com/Programming-Guide.aspx * glucodump code segments are developed by Anders Hammarquist * Rest of code is developed by Arvanitis Christos """ import datetime import enum import re from typing import Dict, Generator, List, Optional, Tuple from glucometerutils.support import driver_base, hiddevice # regexr.com/4k6jb _HEADER_RECORD_RE = re.compile( "^(?P[a-zA-Z])\\|(?P.)(?P.)" "(?P.)(?P.)\\|\\w*\\|(?P\\w+)" "\\^(?P[0-9]{2}\\.[0-9]{2})\\\\(?P[0-9]{2}\\.[0-9]{2})" "\\\\(?P[0-9]{2}\\.[0-9]{2}\\.[0-9]{2})\\" "^(?P(\\w|-)+)\\^(?P(\\w|-)+)\\|" "A=(?P[0-9])\\^C=(?P[0-9]+)\\" "^G=(?P[0-9]+)\\^I=(?P[0-9]+)\\^R=(?P[0-9]+)\\" "^S=(?P[0-9]+)\\^U=(?P[0-9]+)\\" "^V=(?P[0-9]{2})(?P[0-9]{3})\\" "^X=(?P[0-9]{3})(?P[0-9]{3})" "(?P[0-9]{3})(?P[0-9]{3})" "(?P[0-9]{3})(?P[0-9]{3})" "(?P[0-9]{3})(?P[0-9]{3})\\" "^Y=(?P[0-9]{3})(?P[0-9]{3})" "(?P[0-9]{3})(?P[0-9]{3})(?P[0-9]{3})" "(?P[0-9]{3})(?P[0-9]{3})" "(?P[0-9]{3})\\^Z=(?P[0-2])\\|" "(?P[0-9]*)\\|\\|\\|\\|\\|\\|" "(?P[0-9]+)\\|(?P[0-9]+)" ) _RESULT_RECORD_RE = re.compile( "^(?P[a-zA-Z])\\|(?P[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\" "^(?P\\w+)\\|(?P[0-9]+)\\|(?P\\w+\\/\\w+)\\^" "(?P[BPD])\\|\\|(?P[>[0-9]+)" ) _RECORD_FORMAT = re.compile( "\x02(?P(?P[0-7])(?P[^\x0d]*)" "\x0d(?P[\x03\x17]))" "(?P[0-9A-F][0-9A-F])\x0d\x0a" ) class FrameError(Exception): pass @enum.unique class Mode(enum.Enum): """Operation modes.""" ESTABLISH = enum.auto() DATA = enum.auto() PRECOMMAND = enum.auto() COMMAND = enum.auto() class ContourHidDevice(driver_base.GlucometerDriver): """Base class implementing the ContourUSB HID common protocol. """ blocksize = 64 state: Optional[Mode] = None currecno: Optional[int] = None def __init__(self, usb_ids: Tuple[int, int], device_path: Optional[str]) -> None: super().__init__(device_path) self._hid_session = hiddevice.HidSession(usb_ids, device_path) def read(self, r_size=blocksize): result = [] while True: data = self._hid_session.read() dstr = data data_end_idx = data[3] + 4 result.append(dstr[4:data_end_idx]) if data[3] != self.blocksize - 4: break return b"".join(result) def write(self, data): data = b"ABC" + chr(len(data)).encode() + data.encode() pad_length = self.blocksize - len(data) data += pad_length * b"\x00" self._hid_session.write(data) USB_VENDOR_ID: int = 0x1A79 # Bayer Health Care LLC Contour USB_PRODUCT_ID: int = 0x6002 def parse_header_record(self, text): header = _HEADER_RECORD_RE.search(text) self.field_del = header.group("field_del") self.repeat_del = header.group("repeat_del") self.component_del = header.group("component_del") self.escape_del = header.group("escape_del") self.product_code = header.group("product_code") self.dig_ver = header.group("dig_ver") self.anlg_ver = header.group("anlg_ver") self.agp_ver = header.group("agp_ver") self.serial_num = header.group("serial_num") self.sku_id = header.group("sku_id") self.res_marking = header.group("res_marking") self.config_bits = header.group("config_bits") self.lang = header.group("lang") self.interv = header.group("interv") self.ref_method = header.group("ref_method") self.internal = header.group("internal") # U limit self.unit = header.group("unit") self.lo_bound = header.group("lo_bound") self.hi_bound = header.group("hi_bound") # X field self.hypo_limit = header.group("hypo_limit") self.overall_low = header.group("overall_low") self.pre_food_low = header.group("pre_food_low") self.post_food_low = header.group("post_food_low") self.overall_high = header.group("overall_high") self.pre_food_high = header.group("pre_food_high") self.post_food_high = header.group("post_food_high") self.hyper_limit = header.group("hyper_limit") # Y field self.upp_hyper = header.group("upp_hyper") self.low_hyper = header.group("low_hyper") self.upp_hypo = header.group("upp_hypo") self.low_hypo = header.group("low_hypo") self.upp_low_target = header.group("upp_low_target") self.low_low_target = header.group("low_low_target") self.upp_hi_target = header.group("upp_hi_target") self.low_hi_target = header.group("low_hi_target") # Z field self.trends = header.group("trends") self.total = header.group("total") self.spec_ver = header.group("spec_ver") # Datetime string in YYYYMMDDHHMM format self.datetime = header.group("datetime") def checksum(self, text): """ Implemented by Anders Hammarquist for glucodump project More info: https://bitbucket.org/iko/glucodump/src/default/ """ checksum = hex(sum(ord(c) for c in text) % 256).upper().split("X")[1] return ("00" + checksum)[-2:] def checkframe(self, frame) -> Optional[str]: """ Implemented by Anders Hammarquist for glucodump project More info: https://bitbucket.org/iko/glucodump/src/default/ """ match = _RECORD_FORMAT.match(frame) if not match: raise FrameError("Couldn't parse frame", frame) recno = int(match.group("recno")) if self.currecno is None: self.currecno = recno if recno + 1 == self.currecno: return None if recno != self.currecno: raise FrameError( f"Bad recno, got {recno!r} expected {self.currecno!r}", frame ) calculated_checksum = self.checksum(match.group("check")) received_checksum = match.group("checksum") if calculated_checksum != received_checksum: raise FrameError( f"Checksum error: received {received_checksum} expected {calculated_checksum}", frame, ) self.currecno = (self.currecno + 1) % 8 return match.group("text") def connect(self): """Connecting the device, nothing to be done. All process is hadled by hiddevice """ pass def _get_info_record(self): self.currecno = None self.state = Mode.ESTABLISH try: while True: self.write("\x04") res = self.read() if res[0] == 4 and res[-1] == 5: # we are connected and just got a header header_record = res.decode() stx = header_record.find("\x02") if stx != -1: result = _RECORD_FORMAT.match(header_record[stx:]).group("text") self.parse_header_record(result) break else: pass except FrameError as e: print("Frame error") raise e except Exception as e: print("Uknown error occured") raise e def disconnect(self): """Disconnect the device, nothing to be done.""" pass # Some of the commands are also shared across devices that use this HID # protocol, but not many. Only provide here those that do seep to change # between them. def _get_version(self) -> str: """Return the software version of the device.""" return self.dig_ver + " - " + self.anlg_ver + " - " + self.agp_ver def _get_serial_number(self) -> str: """Returns the serial number of the device.""" return self.serial_num def _get_glucose_unit(self) -> str: """Return 0 for mg/dL, 1 for mmol/L""" return self.unit def get_datetime(self) -> datetime.datetime: datetime_str = self.datetime return datetime.datetime( int(datetime_str[0:4]), # year int(datetime_str[4:6]), # month int(datetime_str[6:8]), # day int(datetime_str[8:10]), # hour int(datetime_str[10:12]), # minute 0, ) def sync(self) -> Generator[str, None, None]: """ Sync with meter and yield received data frames FSM implemented by Anders Hammarquist's for glucodump More info: https://bitbucket.org/iko/glucodump/src/default/ """ self.state = Mode.ESTABLISH try: tometer = "\x04" result = None foo = 0 while True: self.write(tometer) if result is not None and self.state == Mode.DATA: yield result result = None data_bytes = self.read() data = data_bytes.decode() if self.state == Mode.ESTABLISH: if data_bytes[-1] == 15: # got a , send tometer = chr(foo) foo += 1 foo %= 256 continue if data_bytes[-1] == 5: # got an , send tometer = "\x06" self.currecno = None continue if self.state == Mode.DATA: if data_bytes[-1] == 4: # got an , done self.state = Mode.PRECOMMAND break stx = data.find("\x02") if stx != -1: # got , parse frame try: result = self.checkframe(data[stx:]) tometer = "\x06" self.state = Mode.DATA except FrameError: tometer = "\x15" # Couldn't parse, else: # Got something we don't understand, it tometer = "\x15" except Exception as e: raise e def parse_result_record(self, text: str) -> Dict[str, str]: result = _RESULT_RECORD_RE.search(text) assert result is not None rec_text = result.groupdict() return rec_text def _get_multirecord(self) -> List[Dict[str, str]]: """Queries for, and returns, "multirecords" results. Returns: (csv.reader): a CSV reader object that returns a record for each line in the record file. """ records_arr = [] for rec in self.sync(): if rec[0] == "R": # parse using result record regular expression rec_text = self.parse_result_record(rec) # get dictionary to use in main driver module without import re records_arr.append(rec_text) # return csv.reader(records_arr) return records_arr # array of groupdicts