summaryrefslogtreecommitdiffstats
path: root/glucometerutils/support/contourusb.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/support/contourusb.py')
-rw-r--r--glucometerutils/support/contourusb.py328
1 files changed, 328 insertions, 0 deletions
diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py
new file mode 100644
index 0000000..d3bd6dc
--- /dev/null
+++ b/glucometerutils/support/contourusb.py
@@ -0,0 +1,328 @@
+# -*- coding: utf-8 -*-
+#
+# SPDX-License-Identifier: MIT
+"""Common routines to implement the ContourUSB common protocol.
+
+Protocol documentation available from Ascensia at
+http://protocols.ascensia.com/Programming-Guide.aspx
+
+* glucodump code segments are developed by Anders Hammarquist
+* Rest of code is developed by Arvanitis Christos
+
+"""
+
+import csv
+import datetime
+import logging
+import re
+import construct
+
+from glucometerutils import exceptions
+from glucometerutils.exceptions import InvalidResponse
+from glucometerutils.support import hiddevice
+
+# regexr.com/4k6jb
+_HEADER_RECORD_RE = re.compile(
+ "^(?P<record_type>[a-zA-Z])\\|(?P<field_del>.)(?P<repeat_del>.)"
+ "(?P<component_del>.)(?P<escape_del>.)\\|\\w*\\|(?P<product_code>\\w+)"
+ "\\^(?P<dig_ver>[0-9]{2}\\.[0-9]{2})\\\\(?P<anlg_ver>[0-9]{2}\\.[0-9]{2})"
+ "\\\\(?P<agp_ver>[0-9]{2}\\.[0-9]{2}\\.[0-9]{2})\\"
+ "^(?P<serial_num>(\\w|-)+)\\^(?P<sku_id>(\\w|-)+)\\|"
+ "A=(?P<res_marking>[0-9])\\^C=(?P<config_bits>[0-9]+)\\"
+ "^G=(?P<lang>[0-9]+)\\^I=(?P<interv>[0-9]+)\\^R=(?P<ref_method>[0-9]+)\\"
+ "^S=(?P<internal>[0-9]+)\\^U=(?P<unit>[0-9]+)\\"
+ "^V=(?P<lo_bound>[0-9]{2})(?P<hi_bound>[0-9]{3})\\"
+ "^X=(?P<hypo_limit>[0-9]{3})(?P<overall_low>[0-9]{3})"
+ "(?P<pre_food_low>[0-9]{3})(?P<post_food_low>[0-9]{3})"
+ "(?P<overall_high>[0-9]{3})(?P<pre_food_high>[0-9]{3})"
+ "(?P<post_food_high>[0-9]{3})(?P<hyper_limit>[0-9]{3})\\"
+ "^Y=(?P<upp_hyper>[0-9]{3})(?P<low_hyper>[0-9]{3})"
+ "(?P<upp_hypo>[0-9]{3})(?P<low_hypo>[0-9]{3})(?P<upp_low_target>[0-9]{3})"
+ "(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})"
+ "(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|"
+ "(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|"
+ "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)")
+
+_RESULT_RECORD_RE = re.compile(
+ "^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\"
+ "^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^"
+ "(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|"
+ "(?P<datetime>[0-9]+)")
+
+_RECORD_FORMAT = re.compile(
+ '\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)'
+ '\x0d(?P<end>[\x03\x17]))'
+ '(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a')
+
+class FrameError(Exception):
+ pass
+
+class ContourHidDevice(hiddevice.HidDevice):
+ """Base class implementing the ContourUSB HID common protocol.
+ """
+ blocksize = 64
+
+ # Operation modes
+ mode_establish = object
+ mode_data = object()
+ mode_precommand = object()
+ mode_command = object()
+ state = None
+
+ def read(self, r_size=blocksize):
+ result = []
+
+ while True:
+ data = self._read()
+ dstr = data
+ result.append(dstr[4:data[3]+4])
+ if data[3] != self.blocksize-4:
+ break
+
+ return (b"".join(result))
+
+ def write(self, data):
+ data = b'ABC' + chr(len(data)).encode() + data.encode()
+ pad_length = self.blocksize - len(data)
+ data += pad_length * b'\x00'
+
+ self._write(data)
+
+ USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour
+ USB_PRODUCT_ID = 0x6002 # type: int
+
+ def parse_header_record(self, text):
+ header = _HEADER_RECORD_RE.search(text)
+
+ self.field_del = header.group('field_del')
+ self.repeat_del = header.group('repeat_del')
+ self.component_del = header.group('component_del')
+ self.escape_del = header.group('escape_del')
+
+ self.product_code = header.group('product_code')
+ self.dig_ver = header.group('dig_ver')
+ self.anlg_ver = header.group('anlg_ver')
+ self.agp_ver = header.group('agp_ver')
+
+ self.serial_num = header.group('serial_num')
+ self.sku_id = header.group('sku_id')
+ self.res_marking = header.group('res_marking')
+ self.config_bits = header.group('config_bits')
+ self.lang = header.group('lang')
+ self.interv = header.group('interv')
+ self.ref_method = header.group('ref_method')
+ self.internal = header.group('internal')
+
+ # U limit
+ self.unit = header.group('unit')
+ self.lo_bound = header.group('lo_bound')
+ self.hi_bound = header.group('hi_bound')
+
+ # X field
+ self.hypo_limit = header.group('hypo_limit')
+ self.overall_low = header.group('overall_low')
+ self.pre_food_low = header.group('pre_food_low')
+ self.post_food_low = header.group('post_food_low')
+ self.overall_high = header.group('overall_high')
+ self.pre_food_high = header.group('pre_food_high')
+ self.post_food_high = header.group('post_food_high')
+ self.hyper_limit = header.group('hyper_limit')
+
+ # Y field
+ self.upp_hyper = header.group('upp_hyper')
+ self.low_hyper = header.group('low_hyper')
+ self.upp_hypo = header.group('upp_hypo')
+ self.low_hypo = header.group('low_hypo')
+ self.upp_low_target = header.group('upp_low_target')
+ self.low_low_target = header.group('low_low_target')
+ self.upp_hi_target = header.group('upp_hi_target')
+ self.low_hi_target = header.group('low_hi_target')
+
+ # Z field
+ self.trends = header.group('trends')
+
+ self.total = header.group('total')
+ self.spec_ver = header.group('spec_ver')
+ # Datetime string in YYYYMMDDHHMM format
+ self.datetime = header.group('datetime')
+
+
+ def checksum(self, text):
+ """
+ Implemented by Anders Hammarquist for glucodump project
+ More info: https://bitbucket.org/iko/glucodump/src/default/
+ """
+ checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1]
+ return ('00' + checksum)[-2:]
+
+ def checkframe(self, frame):
+ """
+ Implemented by Anders Hammarquist for glucodump project
+ More info: https://bitbucket.org/iko/glucodump/src/default/
+ """
+ match = _RECORD_FORMAT.match(frame)
+ if not match:
+ raise FrameError("Couldn't parse frame", frame)
+
+ recno = int(match.group('recno'))
+ if self.currecno is None:
+ self.currecno = recno
+
+ if recno + 1 == self.currecno:
+ return None
+
+ if recno != self.currecno:
+ raise FrameError("Bad recno, got %r expected %r" %
+ (recno, self.currecno),
+ frame)
+
+ checksum = self.checksum(match.group('check'))
+ if checksum != match.group('checksum'):
+ raise FrameError("Checksum error: got %s expected %s" %
+ (match.group('checksum'), checksum),
+ frame)
+
+ self.currecno = (self.currecno + 1) % 8
+ return match.group('text')
+
+ def connect(self):
+ """Connecting the device, nothing to be done.
+ All process is hadled by hiddevice
+ """
+ pass
+
+ def _get_info_record(self):
+ self.currecno = None
+ self.state = self.mode_establish
+ try:
+ while True:
+ self.write('\x04')
+ res = self.read()
+ if res[0] == 4 and res[-1] == 5:
+ # we are connected and just got a header
+ header_record = res.decode()
+ stx = header_record.find('\x02')
+ if stx != -1:
+ result = _RECORD_FORMAT.match(
+ header_record[stx:]).group('text')
+ self.parse_header_record(result)
+ break
+ else:
+ pass
+
+ except FrameError as e:
+ print("Frame error")
+ raise e
+
+ except Exception as e:
+ print("Uknown error occured")
+ raise e
+
+ def disconnect(self):
+ """Disconnect the device, nothing to be done."""
+ pass
+
+ # Some of the commands are also shared across devices that use this HID
+ # protocol, but not many. Only provide here those that do seep to change
+ # between them.
+ def _get_version(self):
+ # type: () -> Text
+ """Return the software version of the device."""
+ return self.dig_ver + " - " + self.anlg_ver + " - " + self.agp_ver
+
+ def _get_serial_number(self):
+ # type: () -> Text
+ """Returns the serial number of the device."""
+ return self.serial_num
+
+ def _get_glucose_unit(self):
+ # type: () -> Text
+ """Return 0 for mg/dL, 1 for mmol/L"""
+ return self.unit
+
+ def get_datetime(self):
+ # type: () -> datetime.datetime
+ datetime_str = self.datetime
+ return datetime.datetime(
+ int(datetime_str[0:4]), # year
+ int(datetime_str[4:6]), # month
+ int(datetime_str[6:8]), # day
+ int(datetime_str[8:10]), # hour
+ int(datetime_str[10:12]), # minute
+ 0)
+
+ def sync(self):
+ """
+ Sync with meter and yield received data frames
+ FSM implemented by Anders Hammarquist's for glucodump
+ More info: https://bitbucket.org/iko/glucodump/src/default/
+ """
+ self.state = self.mode_establish
+ try:
+ tometer = '\x04'
+ result = None
+ foo = 0
+ while True:
+ self.write(tometer)
+ if result is not None and self.state == self.mode_data:
+ yield result
+ result = None
+ data_bytes = self.read()
+ data = data_bytes.decode()
+
+ if self.state == self.mode_establish:
+ if data_bytes[-1] == 15:
+ # got a <NAK>, send <EOT>
+ tometer = chr(foo)
+ foo += 1
+ foo %= 256
+ continue
+ if data_bytes[-1] == 5:
+ # got an <ENQ>, send <ACK>
+ tometer = '\x06'
+ self.currecno = None
+ continue
+ if self.state == self.mode_data:
+ if data_bytes[-1] == 4:
+ # got an <EOT>, done
+ self.state = self.mode_precommand
+ break
+ stx = data.find('\x02')
+ if stx != -1:
+ # got <STX>, parse frame
+ try:
+ result = self.checkframe(data[stx:])
+ tometer = '\x06'
+ self.state = self.mode_data
+ except FrameError as e:
+ tometer = '\x15' # Couldn't parse, <NAK>
+ else:
+ # Got something we don't understand, <NAK> it
+ tometer = '\x15'
+ except Exception as e:
+ raise e
+
+ def parse_result_record(self, text):
+ # type : text -> dict
+ result = _RESULT_RECORD_RE.search(text)
+ rec_text = result.groupdict()
+ return rec_text
+
+ def _get_multirecord(self):
+ # type: (bytes) -> Iterator[List[Text]]
+ """Queries for, and returns, "multirecords" results.
+
+ Returns:
+ (csv.reader): a CSV reader object that returns a record for each line
+ in the record file.
+ """
+ records_arr = []
+ for rec in self.sync():
+ if rec[0] == 'R':
+ # parse using result record regular expression
+ rec_text = self.parse_result_record(rec)
+ # get dictionary to use in main driver module without import re
+
+ records_arr.append(rec_text)
+ # return csv.reader(records_arr)
+ return records_arr # array of groupdicts \ No newline at end of file