diff options
Diffstat (limited to 'glucometerutils/drivers')
-rw-r--r-- | glucometerutils/drivers/fslibre.py | 239 | ||||
-rw-r--r-- | glucometerutils/drivers/fslibre2.py | 32 |
2 files changed, 36 insertions, 235 deletions
diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py index c11bab0..823e49c 100644 --- a/glucometerutils/drivers/fslibre.py +++ b/glucometerutils/drivers/fslibre.py @@ -20,244 +20,13 @@ https://protocols.glucometers.tech/abbott/freestyle-libre """ -import datetime -import logging -from typing import Dict, Generator, Mapping, Optional, Sequence, Tuple, Type +from typing import Optional -from glucometerutils import common -from glucometerutils.support import freestyle +from glucometerutils.support import freestyle_libre -# Fields of the records returned by both $history and $arresult? -# Tuple of pairs of idx and field name -_BASE_ENTRY_MAP = ( - (0, "device_id"), - (1, "type"), - (2, "month"), - (3, "day"), - (4, "year"), # 2-digits - (5, "hour"), - (6, "minute"), - (7, "second"), -) -# Fields of the records returned by $history? -_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + ( - (13, "value"), - (15, "errors"), -) - -# Fields of the results returned by $arresult? where type = 2 -_ARRESULT_TYPE2_ENTRY_MAP = ( - (9, "reading-type"), # 0 = glucose blood strip, - # 1 = ketone blood strip, - # 2 = glucose sensor - (12, "value"), - (15, "sport-flag"), - (16, "medication-flag"), - (17, "rapid-acting-flag"), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP - (18, "long-acting-flag"), - (19, "custom-comments-bitfield"), - (23, "double-long-acting-insulin"), - (25, "food-flag"), - (26, "food-carbs-grams"), - (28, "errors"), -) - -_ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = ( - (9, "old_month"), - (10, "old_day"), - (11, "old_year"), - (12, "old_hour"), - (13, "old_minute"), - (14, "old_second"), -) - -# Fields only valid when rapid-acting-flag is "1" -_ARRESULT_RAPID_INSULIN_ENTRY_MAP = ((43, "double-rapid-acting-insulin"),) - - -def _parse_record( - record: Sequence[str], entry_map: Sequence[Tuple[int, str]] -) -> Dict[str, int]: - """Parses a list of string fields into a dictionary of integers.""" - - if not record: - return {} - - try: - return {key: int(record[idx]) for idx, key in entry_map} - except IndexError: - return {} - - -def _extract_timestamp( - parsed_record: Mapping[str, int], prefix: str = "" -) -> datetime.datetime: - """Extract the timestamp from a parsed record. - - This leverages the fact that all the records have the same base structure. - """ - - return datetime.datetime( - parsed_record[prefix + "year"] + 2000, - parsed_record[prefix + "month"], - parsed_record[prefix + "day"], - parsed_record[prefix + "hour"], - parsed_record[prefix + "minute"], - parsed_record[prefix + "second"], - ) - - -def _parse_arresult(record: Sequence[str]) -> Optional[common.AnyReading]: - """Takes an array of string fields as input and parses it into a Reading.""" - - parsed_record = _parse_record(record, _BASE_ENTRY_MAP) - - # There are other record types, but we don't currently need to expose these. - if not parsed_record: - return None - elif parsed_record["type"] == 2: - parsed_record.update(_parse_record(record, _ARRESULT_TYPE2_ENTRY_MAP)) - elif parsed_record["type"] == 5: - parsed_record.update(_parse_record(record, _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP)) - return common.TimeAdjustment( - _extract_timestamp(parsed_record), - _extract_timestamp(parsed_record, "old_"), - extra_data={"device_id": parsed_record["device_id"]}, - ) - else: - return None - - # Check right away if we have rapid insulin - if parsed_record["rapid-acting-flag"]: - parsed_record.update(_parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP)) - - if parsed_record["errors"]: - return None - - comment_parts = [] - measure_method: Optional[common.MeasurementMethod] = None - cls: Optional[Type[common.AnyReading]] = None - value: Optional[float] = None - - if parsed_record["reading-type"] == 2: - comment_parts.append("(Scan)") - measure_method = common.MeasurementMethod.CGM - cls = common.GlucoseReading - value = parsed_record["value"] - elif parsed_record["reading-type"] == 0: - comment_parts.append("(Blood)") - measure_method = common.MeasurementMethod.BLOOD_SAMPLE - cls = common.GlucoseReading - value = parsed_record["value"] - elif parsed_record["reading-type"] == 1: - comment_parts.append("(Ketone)") - measure_method = common.MeasurementMethod.BLOOD_SAMPLE - cls = common.KetoneReading - # automatically convert the raw value in mmol/L - raw_value = parsed_record["value"] - if raw_value is None: - raise ValueError(f"Invalid Ketone value: {parsed_record!r}") - value = freestyle.convert_ketone_unit(raw_value) - else: - # unknown reading - return None - - custom_comments = record[29:35] - for comment_index in range(6): - if parsed_record["custom-comments-bitfield"] & (1 << comment_index): - comment_parts.append(custom_comments[comment_index]) - - if parsed_record["sport-flag"]: - comment_parts.append("Sport") - - if parsed_record["medication-flag"]: - comment_parts.append("Medication") - - if parsed_record["food-flag"]: - grams = parsed_record["food-carbs-grams"] - if grams: - comment_parts.append(f"Food ({grams} g)") - else: - comment_parts.append("Food") - - if parsed_record["long-acting-flag"]: - insulin = parsed_record["double-long-acting-insulin"] / 2 - if insulin: - comment_parts.append(f"Long-acting insulin ({insulin:.1f})") - else: - comment_parts.append("Long-acting insulin") - - if parsed_record["rapid-acting-flag"]: - # This record does not always exist, so calculate it only when present. - if "double-rapid-acting-insulin" in parsed_record: - rapid_insulin = parsed_record["double-rapid-acting-insulin"] / 2 - comment_parts.append(f"Rapid-acting insulin ({rapid_insulin:.1f})") - else: - comment_parts.append("Rapid-acting insulin") - - reading = cls( - _extract_timestamp(parsed_record), - value, - comment="; ".join(comment_parts), - measure_method=measure_method, - extra_data={"device_id": parsed_record["device_id"]}, - ) - - return reading - - -class Device(freestyle.FreeStyleHidDevice): - """Glucometer driver for FreeStyle Libre devices.""" +class Device(freestyle_libre.LibreDevice): + _MODEL_NAME = "FreeStyle Libre" def __init__(self, device_path: Optional[str]) -> None: super().__init__(0x3650, device_path, encoding="utf-8") - - def get_meter_info(self) -> common.MeterInfo: - """Return the device information in structured form.""" - return common.MeterInfo( - "FreeStyle Libre", - serial_number=self.get_serial_number(), - version_info=("Software version: " + self._get_version(),), - native_unit=self.get_glucose_unit(), - patient_name=self.get_patient_name(), - ) - - def get_serial_number(self) -> str: - """Overridden function as the command is not compatible.""" - return self._session.send_text_command(b"$sn?").rstrip("\r\n") - - def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use - """Returns the glucose unit of the device.""" - # TODO(Flameeyes): figure out how to identify the actual unit on the - # device. - return common.Unit.MG_DL - - def get_readings(self) -> Generator[common.AnyReading, None, None]: - # First of all get the usually longer list of sensor readings, and - # convert them to Readings objects. - for record in self._session.query_multirecord(b"$history?"): - parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP) - - if not parsed_record or parsed_record["errors"] != 0: - # The reading is considered invalid, so ignore it. - continue - - yield common.GlucoseReading( - _extract_timestamp(parsed_record), - parsed_record["value"], - comment="(Sensor)", - measure_method=common.MeasurementMethod.CGM, - extra_data={"device_id": parsed_record["device_id"]}, - ) - - # Then get the results of explicit scans and blood tests (and other - # events). - for record in self._session.query_multirecord(b"$arresult?"): - logging.debug(f"Retrieved arresult: {record!r}") - reading = _parse_arresult(record) - if reading: - yield reading - - def zero_log(self) -> None: - self._session.send_text_command(b"$resetpatient") diff --git a/glucometerutils/drivers/fslibre2.py b/glucometerutils/drivers/fslibre2.py new file mode 100644 index 0000000..a8cd657 --- /dev/null +++ b/glucometerutils/drivers/fslibre2.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: © 2023 The glucometerutils Authors +# SPDX-License-Identifier: MIT +"""Driver for FreeStyle Libre 2 devices. + +Supported features: + The same as the fslibre driver. + +Expected device path: /dev/hidraw9 or similar HID device. Optional when using +HIDAPI. + +This driver is a shim on top of the fslibre driver, forcing encryption to be +enabled for the session. + +Further information on the device protocol can be found at + +https://protocols.glucometers.tech/abbott/freestyle-libre +https://protocols.glucometers.tech/abbott/freestyle-libre-2 + +""" + +from typing import Optional + +from glucometerutils.support import freestyle_libre + + +class Device(freestyle_libre.LibreDevice): + _MODEL_NAME = "FreeStyle Libre 2" + + def __init__(self, device_path: Optional[str]) -> None: + super().__init__(0x3950, device_path, encoding="utf-8", encrypted=True) |