power-profiler-client/ble_interface.py
2023-06-16 16:19:53 +02:00

223 lines
7.1 KiB
Python

from asyncio import AbstractEventLoop
import asyncio
from typing import Dict
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.service import BleakGATTService
from bleak.uuids import normalize_uuid_str
import logging
import struct
import myUUIDs
class ble_interface:
CURRENT = "current"
VOLTAGE = "volts"
CONFIGURATION = "conf"
RANGE = "range"
RFRSH_DELAY = "refresh_delay"
ON_CONNECT = "on_connect"
def __init__(self, address, loop: AbstractEventLoop, logger : logging.Logger) -> None:
self.address = address
self.loop = loop
self.client: BleakClient
self.log = logger
self.callbacks = {}
self.services: Dict[str, BleakGATTService] = {}
self.handlers = {
self.CURRENT: self.__current_meas_handler,
self.RANGE: self.__range_handler,
}
async def __connect(self):
device = await BleakScanner.find_device_by_address(self.address, cb=dict(use_bdaddr=False))
if device is None:
self.log.error("could not find device with address")
return
async with BleakClient(device) as client:
self.log.info("Connected")
self.client = client
self.call_callback(self.ON_CONNECT)
services = client.services.services
self.parse_services(services)
await asyncio.Event().wait()
def parse_services(self, services: Dict[int, BleakGATTService]):
for i in services:
s = services[i]
if(s.uuid == normalize_uuid_str(myUUIDs.METROLOGY_SERVICE)):
if(s.characteristics[0].uuid == normalize_uuid_str(myUUIDs.ELECTRIC_CURRENT_CHAR)):
self.services[self.CURRENT] = s
if(s.characteristics[0].uuid == normalize_uuid_str(myUUIDs.VOLTAGE_CHAR)):
self.services[self.VOLTAGE] = s
if(s.uuid == normalize_uuid_str(myUUIDs.CONFIGURATION_SERVICE)):
self.services[self.CONFIGURATION] = s
if(s.uuid == normalize_uuid_str(myUUIDs.METROLOGY_RANGE_SERVICE)):
self.services[self.RANGE] = s
def connect(self):
self.log.info("connecting")
self.loop.create_task(self.__connect())
def subscribe_to(self, to: str, char_uuid: str | None = None):
self.loop.create_task(self.__subscribe_to(to, char_uuid))
async def __subscribe_to(self, to: str, char_uuid: str | None):
self.log.info("subscribing to %s", to)
for c in filter(lambda c: (not char_uuid) or (c.uuid == char_uuid), self.services[to].characteristics):
await self.client.start_notify(c, self.handlers[to])
def add_callback(self, id: str, callback):
if(not id in self.callbacks):
self.callbacks[id] = []
self.callbacks[id].append(callback)
def call_callback(self, id: str, *params):
if(id in self.callbacks):
for c in self.callbacks[id]:
if(params):
c(self, *params)
else:
c(self)
def __current_meas_handler(self, char: BleakGATTCharacteristic, data: bytearray):
val = struct.unpack("<I", data)[0]
handle = next(i for i,c in enumerate(self.services[self.CURRENT].characteristics) if c.handle == char.handle)
self.call_callback(self.CURRENT, handle, val)
def __range_handler(self, char: BleakGATTCharacteristic, data: bytearray):
val = struct.unpack("<i", data)[0]
self.call_callback(self.RANGE, val)
async def get_refresh_delay(self) -> int:
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.SAMPLING_RATE_CHAR);
if(not c):
return -1
val = await self.client.read_gatt_char(c)
return struct.unpack("<I", val)[0]
async def get_zero_cali_nsamp(self) -> int:
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.ZERO_CALI_NSAMP);
if(not c):
return -1
val = await self.client.read_gatt_char(c)
return struct.unpack("<I", val)[0]
async def get_auto_range(self) -> bool:
s = self.services[self.RANGE]
c = s.get_characteristic(myUUIDs.AUTO_RANGE_CHAR);
if(not c):
return False
val = await self.client.read_gatt_char(c)
self.log.info(val)
return struct.unpack("<i", val)[0] != 0
async def get_range(self) -> bool:
s = self.services[self.RANGE]
c = s.get_characteristic(myUUIDs.ELECTRIC_CURRENT_RANGE_CHAR);
if(not c):
return False
val = await self.client.read_gatt_char(c)
self.log.info(val)
return struct.unpack("<i", val)[0] != 0
def update_refresh_delay(self, val: int):
self.loop.create_task(self.__update_refresh_delay(val))
async def __update_refresh_delay(self, val: int):
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.SAMPLING_RATE_CHAR)
if(not c):
return
data = struct.pack("<I", val)
await self.client.write_gatt_char(c, data)
def zero_cali(self):
self.loop.create_task(self.__zero_cali())
async def __zero_cali(self):
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.ZERO_CALI_CHAR)
if(not c):
return
data = struct.pack("<B", 1)
await self.client.write_gatt_char(c, data)
def update_zero_cali_nsamp(self, val: int):
self.loop.create_task(self.__update_zero_cali_nsamp(val))
async def __update_zero_cali_nsamp(self, val: int):
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.ZERO_CALI_NSAMP)
if(not c):
return
data = struct.pack("<I", val)
await self.client.write_gatt_char(c, data)
def reset_offsets(self):
self.loop.create_task(self.__reset_offsets())
async def __reset_offsets(self):
s = self.services[self.CONFIGURATION]
c = s.get_characteristic(myUUIDs.ZERO_CALI_RESET)
if(not c):
return
data = struct.pack("<B", 1)
await self.client.write_gatt_char(c, data)
def update_auto_range(self, val: bool):
self.loop.create_task(self.__update_auto_range(val))
async def __update_auto_range(self, val: bool):
s = self.services[self.RANGE]
c = s.get_characteristic(myUUIDs.AUTO_RANGE_CHAR)
if(not c):
return
data = struct.pack("<B", val)
await self.client.write_gatt_char(c, data)
def update_range(self, val: int):
self.loop.create_task(self.__update_range(val))
async def __update_range(self, val: int):
s = self.services[self.RANGE]
c = s.get_characteristic(myUUIDs.ELECTRIC_CURRENT_RANGE_CHAR)
if(not c):
return
data = struct.pack("<i", val)
await self.client.write_gatt_char(c, data)