11. BLE

Cíle cvičení

  • Vytvoření BLE periferie a komunikace se zařízením
  • Přístup ke GATT charakteristikám
  • Propojení se stávajícím systémem

Podklady

Vytvoření BLE periferie

Pro vytvoření BLE periférie z RPi Pico pouřijeme již hotové kódy, zejména pak moduly ble_advertising.py a ble_simple_peripheral.py. Oba soubory nahrajte do paměti RPi Pico tak, aby bylo možné je použít.

Následující kód přečte teplotu interního teploměru RP2040 a pošle hodnotu zařízení v roli central.

from machine import Pin, ADC
import bluetooth
from ble_simple_peripheral import BLESimplePeripheral
import time
 
# Create a Bluetooth Low Energy (BLE) object
ble = bluetooth.BLE()
 
# Create an instance of the BLESimplePeripheral class with the BLE object
sp = BLESimplePeripheral(ble)
adc = ADC(4)
 
while True:
    if sp.is_connected():  # Check if a BLE connection is established
        # Read the value from the internal temperature sensor
        adc_value = adc.read_u16()
        # Convert ADC value to voltage
        voltage = adc_value * (3.3 / 65535.0)
        # Temperature calculation based on sensor characteristics
        temperature = 27 - (voltage - 0.706) / 0.001721
        # Transmit the temperature value over BLE
        temperature_data = str(temperature).encode()
        sp.send(temperature_data)
    time.sleep(1)

Pro otestování funkcionality lze využít BLE klient na telefonu. Na cvičení je velmi vhodné zařízení pojmenovat unikátním názvem v rámci skupiny pro jednodušší identifikaci.

Pokud váš klient umí zapisovat do BLE periférie, lze vyzkoušet následující příklad, který mění stav LED na RPi PI:

from machine import Pin 
import bluetooth
from ble_simple_peripheral import BLESimplePeripheral
 
# Create a Bluetooth Low Energy (BLE) object
ble = bluetooth.BLE()
 
# Create an instance of the BLESimplePeripheral class with the BLE object
sp = BLESimplePeripheral(ble)
 
led = Pin("LED", Pin.OUT)
led_state = 0
 
# Define a callback function to handle received data
def on_rx(data):
    print("Data received: ", data)  # Print the received data
    global led_state  # Access the global variable led_state
    if data == b'toggle\r\n':  # Check if the received data is "toggle"
        led.value(not led_state)  # Toggle the LED state (on/off)
        led_state = 1 - led_state  # Update the LED state
 
# Start an infinite loop
while True:
    if sp.is_connected():   # Check if a BLE connection is established
        sp.on_write(on_rx)  # Set the callback function for data reception

Funkcionalita tohoto kódu závisí na tom, jak BLE centrála do zařízení zapisuje (celá zpráva najednou nebo jednotlivé byty/znaky). Je možné, že bude třeba kód upravit.

BLE centrála s modulem Bleak

Abychom mohli s BLE zařízením komunikovat, je třeba znát jeho fyzickou adresu. Tu lze zjistit přímo na zařízení:

import bluetooth
 
def mac2str(mac):
    return ':'.join([f"{b:02X}" for b in mac])
 
ble = bluetooth.BLE()
ble.active(True)
type,mac = ble.config('mac')
print(mac2str(mac))

Pokud bychom chtěli vědět o všech zařízeních okolo, je možné spustit na PC scanner, který používá knihovnu Bleak.

import asyncio
from bleak import discover
 
async def main():
    devices = await discover()
    for d in devices:
        print(d)
 
if __name__ == "__main__":
    asyncio.run(main())

Dostupné služby

Pokud známe adresu, následující kód vrátí seznam služeb dostupných přes GATT databázi:

import sys
import asyncio
import platform
 
from bleak import BleakClient
 
async def main(address: str):
    async with BleakClient(address) as client:
        svcs = await client.get_services()
        print("Services:")
        for service in svcs:
            print(service)
 
if __name__ == "__main__":
    asyncio.run(main(sys.argv[1]))

Argumentem programu je adresa zařízení:

$ python get_services.py D8:3A:DD:5D:EC:0A
Services:
00001800-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Access Profile
00001801-0000-1000-8000-00805f9b34fb (Handle: 4): Generic Attribute Profile
6e400001-b5a3-f393-e0a9-e50e24dcca9e (Handle: 7): Nordic UART Service

Service explorer

Pro využití dat nabízených prostřednictvím BLE rozhraní je třeba znát UUID jednotlivých služeb nebo charakteristik (data z konkrétních senzorů).

import sys
import platform
import asyncio
import logging
 
from bleak import BleakClient
 
logger = logging.getLogger(__name__)
 
ADDRESS = (
    "84:FD:27:E6:5A:B9"
    if platform.system() != "Darwin"
    else "B9EA5233-37EF-4DD6-87A8-2A875E821C46"
)
 
async def main(address):
    async with BleakClient(address) as client:
        logger.info(f"Connected: {client.is_connected}")
 
        for service in client.services:
            logger.info(f"[Service] {service}")
            for char in service.characteristics:
                if "read" in char.properties:
                    try:
                        value = bytes(await client.read_gatt_char(char.uuid))
                        logger.info(
                            f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
                        )
                    except Exception as e:
                        logger.error(
                            f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {e}"
                        )
 
                else:
                    value = None
                    logger.info(
                        f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
                    )
 
                for descriptor in char.descriptors:
                    try:
                        value = bytes(
                            await client.read_gatt_descriptor(descriptor.handle)
                        )
                        logger.info(f"\t\t[Descriptor] {descriptor}) | Value: {value}")
                    except Exception as e:
                        logger.error(f"\t\t[Descriptor] {descriptor}) | Value: {e}")
 
 
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main(sys.argv[1] if len(sys.argv) == 2 else ADDRESS))

$ python get_characteristics.py
INFO:bleak.backends.winrt.client:Services resolved for BleakClientWinRT (D8:3A:DD:5D:EC:0A)
INFO:__main__:Connected: True
INFO:__main__:[Service] 00001800-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Access Profile
INFO:__main__:  [Characteristic] 00002a00-0000-1000-8000-00805f9b34fb (Handle: 2):  (read), Value: b'MPY BTSTACK'
INFO:__main__:[Service] 00001801-0000-1000-8000-00805f9b34fb (Handle: 4): Generic Attribute Profile
INFO:__main__:  [Characteristic] 00002a05-0000-1000-8000-00805f9b34fb (Handle: 5):  (read), Value: b''
INFO:__main__:[Service] 6e400001-b5a3-f393-e0a9-e50e24dcca9e (Handle: 7): Nordic UART Service
INFO:__main__:  [Characteristic] 6e400003-b5a3-f393-e0a9-e50e24dcca9e (Handle: 8):  (read,notify), Value: b''
INFO:__main__:          [Descriptor] 00002902-0000-1000-8000-00805f9b34fb (Handle: 10): Client Characteristic Configuration) | Value: b'\x00\x00'
INFO:__main__:  [Characteristic] 6e400002-b5a3-f393-e0a9-e50e24dcca9e (Handle: 11):  (write-without-response,write), Value: None

Asynchronní čtení data konkrétní charakteristiky pak může vypadat např. takto:

import asyncio
import threading
from bleak import BleakClient 
 
BTAddr = "D8:3A:DD:5D:EC:0A"
char_uuid = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
 
AppRunning = True
Newest = None
 
def notification_handler(sender, data):
    global Newest
    Newest = data
 
async def Loop():
    global AppRunning, Newest
 
    if Newest == None:
        return
 
    data = Newest
    print(data)
 
async def main():
    print(f"Connecting: ", BTAddr, char_uuid)
    global AppRunning
    async with BleakClient(BTAddr) as client:
        print(f"Connected: {client.is_connected}")
        await client.start_notify(char_uuid, notification_handler)
        while AppRunning:
            try:
                await asyncio.sleep(10)
                await Loop()
            except KeyboardInterrupt:
                break
        await client.stop_notify(char_uuid)
 
if __name__ == "__main__":
    asyncio.run(main())

courses/b0b37nsi/tutorials/11.txt · Last modified: 2024/05/02 12:00 by spicajak