2023-06-15 17:53:12 +02:00

55 lines
1.6 KiB
Python

# -*- coding: utf-8 -*-
import argparse
import asyncio
import logging
import struct
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.uuids import normalize_uuid_str
import myUUIDs
logger = logging.getLogger(__name__)
def current_meas_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
value = struct.unpack("<I", data)[0]
logger.info("%s: %f", characteristic.description, value * 1.0e-6)
async def main():
logger.info("starting scan...")
device = await BleakScanner.find_device_by_address("84:F7:03:1B:C6:A2", cb=dict(use_bdaddr=False))
if device is None:
logger.error("could not find device with address")
return
logger.info("connecting to device...")
async with BleakClient(device) as client:
logger.info("Connected")
current_svc_handle = 0
services = client.services.services
metrology_svcs = [services[i] for i in services if services[i].uuid == normalize_uuid_str(myUUIDs.METROLOGY_SERVICE)]
current_meas_svc = next(s for s in metrology_svcs if s.get_characteristic(myUUIDs.ELECTRIC_CURRENT_CHAR))
for c in current_meas_svc.characteristics:
await client.start_notify(c, current_meas_handler)
await asyncio.Event().wait()
# await client.stop_notify(args.characteristic)
if __name__ == "__main__":
logging.basicConfig(
level="INFO",
format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()