from asyncio import AbstractEventLoop import asyncio from typing import Dict, dataclass_transform from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic from bleak.backends.service import BleakGATTService from bleak.uuids import normalize_uuid_str import logging import struct import myUUIDs class ble_interface: CURRENT = "current" CURRENT_F = "current_f" VOLTAGE = "volts" CONFIGURATION = "conf" RANGE = "range" RFRSH_DELAY = "refresh_delay" ON_CONNECT = "on_connect" def __init__(self, address, loop: AbstractEventLoop, logger : logging.Logger) -> None: self.address = address self.loop = loop self.client: BleakClient self.log = logger self.callbacks = {} self.services: Dict[str, BleakGATTService] = {} self.handlers = { self.CURRENT: self.__current_meas_handler, self.CURRENT_F: self.__current_f_meas_handler, self.VOLTAGE: self.__voltage_meas_handler, self.RANGE: self.__range_handler, } self.characteristics_gain: Dict[str, Dict[int, int]] = {} # for every service, links handle with gain async def __connect(self): device = await BleakScanner.find_device_by_address(self.address, cb=dict(use_bdaddr=False)) if device is None: self.log.error("could not find device with address") return async with BleakClient(device) as client: self.log.info("Connected") self.client = client services = client.services.services self.parse_services(services) await self.get_associated_gains(self.CURRENT) await self.get_associated_gains(self.VOLTAGE) self.call_callback(self.ON_CONNECT) await asyncio.Event().wait() async def get_associated_gains(self, id: str): s = self.services[id] for c in s.characteristics: self.log.info(c.descriptors) descr = c.get_descriptor(normalize_uuid_str(myUUIDs.SOURCE_GAIN_DESCR)) if(not descr): self.log.info("descr not found") continue data = await self.client.read_gatt_descriptor(descr.handle); gain = struct.unpack(" int: s = self.services[self.CONFIGURATION] c = s.get_characteristic(myUUIDs.SAMPLING_RATE_CHAR); if(not c): return -1 val = await self.client.read_gatt_char(c) return struct.unpack(" int: s = self.services[self.CONFIGURATION] c = s.get_characteristic(myUUIDs.ZERO_CALI_NSAMP); if(not c): return -1 val = await self.client.read_gatt_char(c) return struct.unpack(" bool: s = self.services[self.RANGE] c = s.get_characteristic(myUUIDs.AUTO_RANGE_CHAR); if(not c): return False val = await self.client.read_gatt_char(c) self.log.info(val) return struct.unpack(" bool: s = self.services[self.RANGE] c = s.get_characteristic(myUUIDs.ELECTRIC_CURRENT_RANGE_CHAR); if(not c): return False val = await self.client.read_gatt_char(c) self.log.info(val) return struct.unpack("