summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers/fslibre.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/drivers/fslibre.py')
-rw-r--r--glucometerutils/drivers/fslibre.py239
1 files changed, 4 insertions, 235 deletions
diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py
index c11bab0..823e49c 100644
--- a/glucometerutils/drivers/fslibre.py
+++ b/glucometerutils/drivers/fslibre.py
@@ -20,244 +20,13 @@ https://protocols.glucometers.tech/abbott/freestyle-libre
"""
-import datetime
-import logging
-from typing import Dict, Generator, Mapping, Optional, Sequence, Tuple, Type
+from typing import Optional
-from glucometerutils import common
-from glucometerutils.support import freestyle
+from glucometerutils.support import freestyle_libre
-# Fields of the records returned by both $history and $arresult?
-# Tuple of pairs of idx and field name
-_BASE_ENTRY_MAP = (
- (0, "device_id"),
- (1, "type"),
- (2, "month"),
- (3, "day"),
- (4, "year"), # 2-digits
- (5, "hour"),
- (6, "minute"),
- (7, "second"),
-)
-# Fields of the records returned by $history?
-_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + (
- (13, "value"),
- (15, "errors"),
-)
-
-# Fields of the results returned by $arresult? where type = 2
-_ARRESULT_TYPE2_ENTRY_MAP = (
- (9, "reading-type"), # 0 = glucose blood strip,
- # 1 = ketone blood strip,
- # 2 = glucose sensor
- (12, "value"),
- (15, "sport-flag"),
- (16, "medication-flag"),
- (17, "rapid-acting-flag"), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP
- (18, "long-acting-flag"),
- (19, "custom-comments-bitfield"),
- (23, "double-long-acting-insulin"),
- (25, "food-flag"),
- (26, "food-carbs-grams"),
- (28, "errors"),
-)
-
-_ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = (
- (9, "old_month"),
- (10, "old_day"),
- (11, "old_year"),
- (12, "old_hour"),
- (13, "old_minute"),
- (14, "old_second"),
-)
-
-# Fields only valid when rapid-acting-flag is "1"
-_ARRESULT_RAPID_INSULIN_ENTRY_MAP = ((43, "double-rapid-acting-insulin"),)
-
-
-def _parse_record(
- record: Sequence[str], entry_map: Sequence[Tuple[int, str]]
-) -> Dict[str, int]:
- """Parses a list of string fields into a dictionary of integers."""
-
- if not record:
- return {}
-
- try:
- return {key: int(record[idx]) for idx, key in entry_map}
- except IndexError:
- return {}
-
-
-def _extract_timestamp(
- parsed_record: Mapping[str, int], prefix: str = ""
-) -> datetime.datetime:
- """Extract the timestamp from a parsed record.
-
- This leverages the fact that all the records have the same base structure.
- """
-
- return datetime.datetime(
- parsed_record[prefix + "year"] + 2000,
- parsed_record[prefix + "month"],
- parsed_record[prefix + "day"],
- parsed_record[prefix + "hour"],
- parsed_record[prefix + "minute"],
- parsed_record[prefix + "second"],
- )
-
-
-def _parse_arresult(record: Sequence[str]) -> Optional[common.AnyReading]:
- """Takes an array of string fields as input and parses it into a Reading."""
-
- parsed_record = _parse_record(record, _BASE_ENTRY_MAP)
-
- # There are other record types, but we don't currently need to expose these.
- if not parsed_record:
- return None
- elif parsed_record["type"] == 2:
- parsed_record.update(_parse_record(record, _ARRESULT_TYPE2_ENTRY_MAP))
- elif parsed_record["type"] == 5:
- parsed_record.update(_parse_record(record, _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP))
- return common.TimeAdjustment(
- _extract_timestamp(parsed_record),
- _extract_timestamp(parsed_record, "old_"),
- extra_data={"device_id": parsed_record["device_id"]},
- )
- else:
- return None
-
- # Check right away if we have rapid insulin
- if parsed_record["rapid-acting-flag"]:
- parsed_record.update(_parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP))
-
- if parsed_record["errors"]:
- return None
-
- comment_parts = []
- measure_method: Optional[common.MeasurementMethod] = None
- cls: Optional[Type[common.AnyReading]] = None
- value: Optional[float] = None
-
- if parsed_record["reading-type"] == 2:
- comment_parts.append("(Scan)")
- measure_method = common.MeasurementMethod.CGM
- cls = common.GlucoseReading
- value = parsed_record["value"]
- elif parsed_record["reading-type"] == 0:
- comment_parts.append("(Blood)")
- measure_method = common.MeasurementMethod.BLOOD_SAMPLE
- cls = common.GlucoseReading
- value = parsed_record["value"]
- elif parsed_record["reading-type"] == 1:
- comment_parts.append("(Ketone)")
- measure_method = common.MeasurementMethod.BLOOD_SAMPLE
- cls = common.KetoneReading
- # automatically convert the raw value in mmol/L
- raw_value = parsed_record["value"]
- if raw_value is None:
- raise ValueError(f"Invalid Ketone value: {parsed_record!r}")
- value = freestyle.convert_ketone_unit(raw_value)
- else:
- # unknown reading
- return None
-
- custom_comments = record[29:35]
- for comment_index in range(6):
- if parsed_record["custom-comments-bitfield"] & (1 << comment_index):
- comment_parts.append(custom_comments[comment_index])
-
- if parsed_record["sport-flag"]:
- comment_parts.append("Sport")
-
- if parsed_record["medication-flag"]:
- comment_parts.append("Medication")
-
- if parsed_record["food-flag"]:
- grams = parsed_record["food-carbs-grams"]
- if grams:
- comment_parts.append(f"Food ({grams} g)")
- else:
- comment_parts.append("Food")
-
- if parsed_record["long-acting-flag"]:
- insulin = parsed_record["double-long-acting-insulin"] / 2
- if insulin:
- comment_parts.append(f"Long-acting insulin ({insulin:.1f})")
- else:
- comment_parts.append("Long-acting insulin")
-
- if parsed_record["rapid-acting-flag"]:
- # This record does not always exist, so calculate it only when present.
- if "double-rapid-acting-insulin" in parsed_record:
- rapid_insulin = parsed_record["double-rapid-acting-insulin"] / 2
- comment_parts.append(f"Rapid-acting insulin ({rapid_insulin:.1f})")
- else:
- comment_parts.append("Rapid-acting insulin")
-
- reading = cls(
- _extract_timestamp(parsed_record),
- value,
- comment="; ".join(comment_parts),
- measure_method=measure_method,
- extra_data={"device_id": parsed_record["device_id"]},
- )
-
- return reading
-
-
-class Device(freestyle.FreeStyleHidDevice):
- """Glucometer driver for FreeStyle Libre devices."""
+class Device(freestyle_libre.LibreDevice):
+ _MODEL_NAME = "FreeStyle Libre"
def __init__(self, device_path: Optional[str]) -> None:
super().__init__(0x3650, device_path, encoding="utf-8")
-
- def get_meter_info(self) -> common.MeterInfo:
- """Return the device information in structured form."""
- return common.MeterInfo(
- "FreeStyle Libre",
- serial_number=self.get_serial_number(),
- version_info=("Software version: " + self._get_version(),),
- native_unit=self.get_glucose_unit(),
- patient_name=self.get_patient_name(),
- )
-
- def get_serial_number(self) -> str:
- """Overridden function as the command is not compatible."""
- return self._session.send_text_command(b"$sn?").rstrip("\r\n")
-
- def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use
- """Returns the glucose unit of the device."""
- # TODO(Flameeyes): figure out how to identify the actual unit on the
- # device.
- return common.Unit.MG_DL
-
- def get_readings(self) -> Generator[common.AnyReading, None, None]:
- # First of all get the usually longer list of sensor readings, and
- # convert them to Readings objects.
- for record in self._session.query_multirecord(b"$history?"):
- parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP)
-
- if not parsed_record or parsed_record["errors"] != 0:
- # The reading is considered invalid, so ignore it.
- continue
-
- yield common.GlucoseReading(
- _extract_timestamp(parsed_record),
- parsed_record["value"],
- comment="(Sensor)",
- measure_method=common.MeasurementMethod.CGM,
- extra_data={"device_id": parsed_record["device_id"]},
- )
-
- # Then get the results of explicit scans and blood tests (and other
- # events).
- for record in self._session.query_multirecord(b"$arresult?"):
- logging.debug(f"Retrieved arresult: {record!r}")
- reading = _parse_arresult(record)
- if reading:
- yield reading
-
- def zero_log(self) -> None:
- self._session.send_text_command(b"$resetpatient")