python-3.x 使用adafruit_ble和Bleak通过BLE上的UART无延迟地发送数据

dgiusagp  于 2023-04-22  发布在  Python
关注(0)|答案(1)|浏览(442)

我想可靠地,**从使用Windows 10 64 Bit的PC向Adafruit ItsyBitsy nRF 52840发送包含数字0-128的长度为15的列表(circuitpython)。响应将从接收方发送到发送方,以便我可以确保发送了正确的数据。我想避免使用任何时间。sleep()或asyncio.sleep()在我的代码中延迟。我的当前代码如下:

发送方代码:

async def run(write_value):
    # Scan for devices
    mac_addr = "XX:XX:XX:XX:XX"
    tx_charac = "X"
    rx_charac = "X"

    #devices = await discover()
    client = BleakClient(mac_addr, timeout=30)
    await client.connect()
    await client.write_gatt_char(tx_charac, bytearray(write_value))
    await asyncio.sleep(5)
    answer = await client.read_gatt_char(rx_charac)
    print(answer)
    await client.disconnect()
    del client
    return answer

answer = asyncio.run(run(x))

接收端编码

from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
from adafruit_ble.services.nordic import UARTService

ble = BLERadio()
uart = UARTService()
advertisement = ProvideServicesAdvertisement(uart)

while True:
    print("running")
    ble.start_advertising(advertisement)
    print("waiting to connect to BLE central")
    while not ble.connected:
        pass
    print("connected")
    while ble.connected:
        s = uart.read()
        uart.write(s)
        if s:
            sequence = [x for x in s]
            if len(sequence)>1:
                #Do some processing
                print(sequence)
            del s
            del sequence

不幸的是,代码似乎工作不可靠。
我能做些什么来提高发送和接收过程的可靠性、可重复性和可扩展性?
感谢您的评分

我遇到以下问题:

  • 首先,我总是要引入一个等待时间,以接收正确的回声。
  • 有时设备在一段时间后不想再重新连接,返回错误in connect self._device_info = device.details.adv.bluetooth_address AttributeError: 'NoneType' object has no attribute 'bluetooth_address' "in connect self._device_info = device.details.adv.bluetooth_address AttributeError: 'NoneType' object has no attribute 'bluetooth_address'
  • 此外,稍后我想发送包含超过15个元素(例如100个)的列表,但我还不确定如何准确地一次传输大量数据。
q0qdq0h2

q0qdq0h21#

如果你想要一个可靠的写入,那么write_gatt_char方法有一个response参数。这不会向用户返回任何东西,但在蓝牙堆栈的较低级别上,它会得到一个响应,说写入成功。
我没有Adafruit ItsyBitsy,但我有一个不同的Nordic开发板,我在上面设置了一个UART echo服务器,它可以反转发送的值。这对我来说似乎很可靠。

import asyncio
from bleak import BleakClient

async def run(write_value):
    mac_addr = "E1:4B:6C:22:56:F0"
    tx_charac = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
    rx_charac = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"

    async with BleakClient(mac_addr, timeout=30) as client:
        await client.write_gatt_char(
            tx_charac, 
            bytearray(write_value), 
            response=True)

asyncio.run(run(b"desserts#"))

如果你想从服务器获取响应值,那么在响应特征上启动通知可能是一个很好的方法。你仍然会在那里有一个睡眠,但只是在等待响应时保持循环活跃。例如。

import asyncio
from bleak import BleakClient

async def run(write_value):
    mac_addr = "E1:4B:6C:22:56:F0"
    tx_charac = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
    rx_charac = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"

    loop = asyncio.get_event_loop()
    async with BleakClient(mac_addr, timeout=30) as client:
        def response_handler(sender, data: bytes):
            print(f"Information from {sender}")
            print(f"Data returned: {data.decode('ascii')}")
            loop.create_task(client.disconnect())
        await client.start_notify(rx_charac, response_handler)
        await client.write_gatt_char(
            tx_charac,
            bytearray(write_value),
            response=True)
        while client.is_connected:
            await asyncio.sleep(0.1)

asyncio.run(run(b"desserts#"))

它给出了输出:

Information from 6e400002-b5a3-f393-e0a9-e50e24dcca9e (Handle: 43): Nordic UART RX
Data returned: stressed

关于写入长值,您可以查询支持的最大大小:https://bleak.readthedocs.io/en/latest/api/index.html#bleak.backends.characteristic.BleakGATTCharacteristic.max_write_without_response_size
这通常是20字节。你需要把你的数据分成20字节的块来写。

相关问题