power-profiler-client/ble_interface.py
2023-06-17 17:17:20 +02:00

263 lines
8.7 KiB
Python

from asyncio import AbstractEventLoop
import asyncio
from typing import Dict, dataclass_transform
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.service import BleakGATTService
from bleak.uuids import normalize_uuid_str
import logging
import struct
import myUUIDs
class ble_interface:
CURRENT = "current"
CURRENT_F = "current_f"
VOLTAGE = "volts"
CONFIGURATION = "conf"
RANGE = "range"
RFRSH_DELAY = "refresh_delay"
ON_CONNECT = "on_connect"
def __init__(self, address, loop: AbstractEventLoop, logger : logging.Logger) -> None:
self.address = address
self.loop = loop
self.client: BleakClient
self.log = logger
self.callbacks = {}
self.services: Dict[str, BleakGATTService] = {}
self.handlers = {
self.CURRENT: self.__current_meas_handler,
self.CURRENT_F: self.__current_f_meas_handler,
self.VOLTAGE: self.__voltage_meas_handler,
self.RANGE: self.__range_handler,
}
self.characteristics_gain: Dict[str, Dict[int, int]] = {} # for every service, links handle with gain
async def __connect(self):
device = await BleakScanner.find_device_by_address(self.address, cb=dict(use_bdaddr=False))
if device is None:
self.log.error("could not find device with address")
return
async with BleakClient(device) as client:
self.log.info("Connected")
self.client = client
services = client.services.services
self.parse_services(services)
await self.get_associated_gains(self.CURRENT)
await self.get_associated_gains(self.VOLTAGE)
self.call_callback(self.ON_CONNECT)
await asyncio.Event().wait()
async def get_associated_gains(self, id: str):
s = self.services[id]
for c in s.characteristics:
self.log.info(c.descriptors)
descr = c.get_descriptor(normalize_uuid_str(myUUIDs.SOURCE_GAIN_DESCR))
if(not descr):
self.log.info("descr not found")
continue
data = await self.client.read_gatt_descriptor(descr.handle);
gain = struct.unpack("<I", data)[0]
self.log.info("descr %d", gain)
if not self.characteristics_gain.get(id):
self.characteristics_gain[id] = {}
self.characteristics_gain[id][c.handle] = gain
def parse_services(self, services: Dict[int, BleakGATTService]):
for i in services:
s = services[i]
if(s.uuid == normalize_uuid_str(myUUIDs.METROLOGY_SERVICE)):
if(s.characteristics[0].uuid == normalize_uuid_str(myUUIDs.ELECTRIC_CURRENT_CHAR)):
if(len(s.characteristics) == 1):
self.services[self.CURRENT_F] = s
else:
self.services[self.CURRENT] = s
if(s.characteristics[0].uuid == normalize_uuid_str(myUUIDs.VOLTAGE_CHAR)):
self.services[self.VOLTAGE] = s
if(s.uuid == normalize_uuid_str(myUUIDs.CONFIGURATION_SERVICE)):
self.services[self.CONFIGURATION] = s
if(s.uuid == normalize_uuid_str(myUUIDs.METROLOGY_RANGE_SERVICE)):
self.services[self.RANGE] = s
def connect(self):
self.log.info("connecting")
self.loop.create_task(self.__connect())
def subscribe_to(self, to: str, char_uuid: str | None = None):
self.loop.create_task(self.__subscribe_to(to, char_uuid))
async def __subscribe_to(self, to: str, char_uuid: str | None):
self.log.info("subscribing to %s", to)
for c in filter(lambda c: (not char_uuid) or (c.uuid == char_uuid), self.services[to].characteristics):
await self.client.start_notify(c, self.handlers[to])
def add_callback(self, id: str, callback):
if(not id in self.callbacks):
self.callbacks[id] = []
self.callbacks[id].append(callback)
def call_callback(self, id: str, *params):
if(id in self.callbacks):
for c in self.callbacks[id]:
if(params):
c(self, *params)
else:
c(self)
def __current_meas_handler(self, char: BleakGATTCharacteristic, data: bytearray):
val = struct.unpack("<I", data)[0]
gain = self.characteristics_gain[self.CURRENT][char.handle]
self.call_callback(self.CURRENT, gain, val)
def __current_f_meas_handler(self, char: BleakGATTCharacteristic, data: bytearray):
val = struct.unpack("<I", data)[0]
self.call_callback(self.CURRENT_F, "Fused", val)
def __range_handler(self, char: BleakGATTCharacteristic, data: bytearray):
val = struct.unpack("<i", data)[0]
self.call_callback(self.RANGE, val)
def __voltage_meas_handler(self, char: BleakGATTCharacteristic, data: bytearray):
val = struct.unpack("<I", data)[0]
gain = self.characteristics_gain[self.VOLTAGE][char.handle]
self.call_callback(self.VOLTAGE, gain, val)
async def get_refresh_delay(self) -> int:
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.SAMPLING_RATE_CHAR);
if(not c):
return -1
val = await self.client.read_gatt_char(c)
return struct.unpack("<I", val)[0]
async def get_zero_cali_nsamp(self) -> int:
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.ZERO_CALI_NSAMP);
if(not c):
return -1
val = await self.client.read_gatt_char(c)
return struct.unpack("<I", val)[0]
async def get_auto_range(self) -> bool:
s = self.services[self.RANGE]
c = s.get_characteristic(myUUIDs.AUTO_RANGE_CHAR);
if(not c):
return False
val = await self.client.read_gatt_char(c)
self.log.info(val)
return struct.unpack("<i", val)[0] != 0
async def get_range(self) -> bool:
s = self.services[self.RANGE]
c = s.get_characteristic(myUUIDs.ELECTRIC_CURRENT_RANGE_CHAR);
if(not c):
return False
val = await self.client.read_gatt_char(c)
self.log.info(val)
return struct.unpack("<i", val)[0] != 0
def update_refresh_delay(self, val: int):
self.loop.create_task(self.__update_refresh_delay(val))
async def __update_refresh_delay(self, val: int):
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.SAMPLING_RATE_CHAR)
if(not c):
return
data = struct.pack("<I", val)
await self.client.write_gatt_char(c, data)
def zero_cali(self):
self.loop.create_task(self.__zero_cali())
async def __zero_cali(self):
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.ZERO_CALI_CHAR)
if(not c):
return
data = struct.pack("<B", 1)
await self.client.write_gatt_char(c, data)
def update_zero_cali_nsamp(self, val: int):
self.loop.create_task(self.__update_zero_cali_nsamp(val))
async def __update_zero_cali_nsamp(self, val: int):
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.ZERO_CALI_NSAMP)
if(not c):
return
data = struct.pack("<I", val)
await self.client.write_gatt_char(c, data)
def reset_offsets(self):
self.loop.create_task(self.__reset_offsets())
async def __reset_offsets(self):
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.ZERO_CALI_RESET)
if(not c):
return
data = struct.pack("<B", 1)
await self.client.write_gatt_char(c, data)
def update_auto_range(self, val: bool):
self.loop.create_task(self.__update_auto_range(val))
async def __update_auto_range(self, val: bool):
s = self.services[self.RANGE]
c = s.get_characteristic(myUUIDs.AUTO_RANGE_CHAR)
if(not c):
return
data = struct.pack("<B", val)
await self.client.write_gatt_char(c, data)
def update_range(self, val: int):
self.loop.create_task(self.__update_range(val))
async def __update_range(self, val: int):
s = self.services[self.RANGE]
c = s.get_characteristic(myUUIDs.ELECTRIC_CURRENT_RANGE_CHAR)
if(not c):
return
data = struct.pack("<i", val)
await self.client.write_gatt_char(c, data)