diff options
Diffstat (limited to 'glucometerutils/support/contourusb.py')
-rw-r--r-- | glucometerutils/support/contourusb.py | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py new file mode 100644 index 0000000..d3bd6dc --- /dev/null +++ b/glucometerutils/support/contourusb.py @@ -0,0 +1,328 @@ +# -*- coding: utf-8 -*- +# +# SPDX-License-Identifier: MIT +"""Common routines to implement the ContourUSB common protocol. + +Protocol documentation available from Ascensia at +http://protocols.ascensia.com/Programming-Guide.aspx + +* glucodump code segments are developed by Anders Hammarquist +* Rest of code is developed by Arvanitis Christos + +""" + +import csv +import datetime +import logging +import re +import construct + +from glucometerutils import exceptions +from glucometerutils.exceptions import InvalidResponse +from glucometerutils.support import hiddevice + +# regexr.com/4k6jb +_HEADER_RECORD_RE = re.compile( + "^(?P<record_type>[a-zA-Z])\\|(?P<field_del>.)(?P<repeat_del>.)" + "(?P<component_del>.)(?P<escape_del>.)\\|\\w*\\|(?P<product_code>\\w+)" + "\\^(?P<dig_ver>[0-9]{2}\\.[0-9]{2})\\\\(?P<anlg_ver>[0-9]{2}\\.[0-9]{2})" + "\\\\(?P<agp_ver>[0-9]{2}\\.[0-9]{2}\\.[0-9]{2})\\" + "^(?P<serial_num>(\\w|-)+)\\^(?P<sku_id>(\\w|-)+)\\|" + "A=(?P<res_marking>[0-9])\\^C=(?P<config_bits>[0-9]+)\\" + "^G=(?P<lang>[0-9]+)\\^I=(?P<interv>[0-9]+)\\^R=(?P<ref_method>[0-9]+)\\" + "^S=(?P<internal>[0-9]+)\\^U=(?P<unit>[0-9]+)\\" + "^V=(?P<lo_bound>[0-9]{2})(?P<hi_bound>[0-9]{3})\\" + "^X=(?P<hypo_limit>[0-9]{3})(?P<overall_low>[0-9]{3})" + "(?P<pre_food_low>[0-9]{3})(?P<post_food_low>[0-9]{3})" + "(?P<overall_high>[0-9]{3})(?P<pre_food_high>[0-9]{3})" + "(?P<post_food_high>[0-9]{3})(?P<hyper_limit>[0-9]{3})\\" + "^Y=(?P<upp_hyper>[0-9]{3})(?P<low_hyper>[0-9]{3})" + "(?P<upp_hypo>[0-9]{3})(?P<low_hypo>[0-9]{3})(?P<upp_low_target>[0-9]{3})" + "(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})" + "(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|" + "(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|" + "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)") + +_RESULT_RECORD_RE = re.compile( + "^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\" + "^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^" + "(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|" + "(?P<datetime>[0-9]+)") + +_RECORD_FORMAT = re.compile( + '\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)' + '\x0d(?P<end>[\x03\x17]))' + '(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a') + +class FrameError(Exception): + pass + +class ContourHidDevice(hiddevice.HidDevice): + """Base class implementing the ContourUSB HID common protocol. + """ + blocksize = 64 + + # Operation modes + mode_establish = object + mode_data = object() + mode_precommand = object() + mode_command = object() + state = None + + def read(self, r_size=blocksize): + result = [] + + while True: + data = self._read() + dstr = data + result.append(dstr[4:data[3]+4]) + if data[3] != self.blocksize-4: + break + + return (b"".join(result)) + + def write(self, data): + data = b'ABC' + chr(len(data)).encode() + data.encode() + pad_length = self.blocksize - len(data) + data += pad_length * b'\x00' + + self._write(data) + + USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour + USB_PRODUCT_ID = 0x6002 # type: int + + def parse_header_record(self, text): + header = _HEADER_RECORD_RE.search(text) + + self.field_del = header.group('field_del') + self.repeat_del = header.group('repeat_del') + self.component_del = header.group('component_del') + self.escape_del = header.group('escape_del') + + self.product_code = header.group('product_code') + self.dig_ver = header.group('dig_ver') + self.anlg_ver = header.group('anlg_ver') + self.agp_ver = header.group('agp_ver') + + self.serial_num = header.group('serial_num') + self.sku_id = header.group('sku_id') + self.res_marking = header.group('res_marking') + self.config_bits = header.group('config_bits') + self.lang = header.group('lang') + self.interv = header.group('interv') + self.ref_method = header.group('ref_method') + self.internal = header.group('internal') + + # U limit + self.unit = header.group('unit') + self.lo_bound = header.group('lo_bound') + self.hi_bound = header.group('hi_bound') + + # X field + self.hypo_limit = header.group('hypo_limit') + self.overall_low = header.group('overall_low') + self.pre_food_low = header.group('pre_food_low') + self.post_food_low = header.group('post_food_low') + self.overall_high = header.group('overall_high') + self.pre_food_high = header.group('pre_food_high') + self.post_food_high = header.group('post_food_high') + self.hyper_limit = header.group('hyper_limit') + + # Y field + self.upp_hyper = header.group('upp_hyper') + self.low_hyper = header.group('low_hyper') + self.upp_hypo = header.group('upp_hypo') + self.low_hypo = header.group('low_hypo') + self.upp_low_target = header.group('upp_low_target') + self.low_low_target = header.group('low_low_target') + self.upp_hi_target = header.group('upp_hi_target') + self.low_hi_target = header.group('low_hi_target') + + # Z field + self.trends = header.group('trends') + + self.total = header.group('total') + self.spec_ver = header.group('spec_ver') + # Datetime string in YYYYMMDDHHMM format + self.datetime = header.group('datetime') + + + def checksum(self, text): + """ + Implemented by Anders Hammarquist for glucodump project + More info: https://bitbucket.org/iko/glucodump/src/default/ + """ + checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1] + return ('00' + checksum)[-2:] + + def checkframe(self, frame): + """ + Implemented by Anders Hammarquist for glucodump project + More info: https://bitbucket.org/iko/glucodump/src/default/ + """ + match = _RECORD_FORMAT.match(frame) + if not match: + raise FrameError("Couldn't parse frame", frame) + + recno = int(match.group('recno')) + if self.currecno is None: + self.currecno = recno + + if recno + 1 == self.currecno: + return None + + if recno != self.currecno: + raise FrameError("Bad recno, got %r expected %r" % + (recno, self.currecno), + frame) + + checksum = self.checksum(match.group('check')) + if checksum != match.group('checksum'): + raise FrameError("Checksum error: got %s expected %s" % + (match.group('checksum'), checksum), + frame) + + self.currecno = (self.currecno + 1) % 8 + return match.group('text') + + def connect(self): + """Connecting the device, nothing to be done. + All process is hadled by hiddevice + """ + pass + + def _get_info_record(self): + self.currecno = None + self.state = self.mode_establish + try: + while True: + self.write('\x04') + res = self.read() + if res[0] == 4 and res[-1] == 5: + # we are connected and just got a header + header_record = res.decode() + stx = header_record.find('\x02') + if stx != -1: + result = _RECORD_FORMAT.match( + header_record[stx:]).group('text') + self.parse_header_record(result) + break + else: + pass + + except FrameError as e: + print("Frame error") + raise e + + except Exception as e: + print("Uknown error occured") + raise e + + def disconnect(self): + """Disconnect the device, nothing to be done.""" + pass + + # Some of the commands are also shared across devices that use this HID + # protocol, but not many. Only provide here those that do seep to change + # between them. + def _get_version(self): + # type: () -> Text + """Return the software version of the device.""" + return self.dig_ver + " - " + self.anlg_ver + " - " + self.agp_ver + + def _get_serial_number(self): + # type: () -> Text + """Returns the serial number of the device.""" + return self.serial_num + + def _get_glucose_unit(self): + # type: () -> Text + """Return 0 for mg/dL, 1 for mmol/L""" + return self.unit + + def get_datetime(self): + # type: () -> datetime.datetime + datetime_str = self.datetime + return datetime.datetime( + int(datetime_str[0:4]), # year + int(datetime_str[4:6]), # month + int(datetime_str[6:8]), # day + int(datetime_str[8:10]), # hour + int(datetime_str[10:12]), # minute + 0) + + def sync(self): + """ + Sync with meter and yield received data frames + FSM implemented by Anders Hammarquist's for glucodump + More info: https://bitbucket.org/iko/glucodump/src/default/ + """ + self.state = self.mode_establish + try: + tometer = '\x04' + result = None + foo = 0 + while True: + self.write(tometer) + if result is not None and self.state == self.mode_data: + yield result + result = None + data_bytes = self.read() + data = data_bytes.decode() + + if self.state == self.mode_establish: + if data_bytes[-1] == 15: + # got a <NAK>, send <EOT> + tometer = chr(foo) + foo += 1 + foo %= 256 + continue + if data_bytes[-1] == 5: + # got an <ENQ>, send <ACK> + tometer = '\x06' + self.currecno = None + continue + if self.state == self.mode_data: + if data_bytes[-1] == 4: + # got an <EOT>, done + self.state = self.mode_precommand + break + stx = data.find('\x02') + if stx != -1: + # got <STX>, parse frame + try: + result = self.checkframe(data[stx:]) + tometer = '\x06' + self.state = self.mode_data + except FrameError as e: + tometer = '\x15' # Couldn't parse, <NAK> + else: + # Got something we don't understand, <NAK> it + tometer = '\x15' + except Exception as e: + raise e + + def parse_result_record(self, text): + # type : text -> dict + result = _RESULT_RECORD_RE.search(text) + rec_text = result.groupdict() + return rec_text + + def _get_multirecord(self): + # type: (bytes) -> Iterator[List[Text]] + """Queries for, and returns, "multirecords" results. + + Returns: + (csv.reader): a CSV reader object that returns a record for each line + in the record file. + """ + records_arr = [] + for rec in self.sync(): + if rec[0] == 'R': + # parse using result record regular expression + rec_text = self.parse_result_record(rec) + # get dictionary to use in main driver module without import re + + records_arr.append(rec_text) + # return csv.reader(records_arr) + return records_arr # array of groupdicts
\ No newline at end of file |