python的asyncio支持基于协程的UDP网络API吗?

inn6fuwd  于 2023-01-16  发布在  Python
关注(0)|答案(4)|浏览(161)

今天晚上,我浏览了python asyncio模块文档,为我的一个课程项目寻找一些想法,但是我很快发现python的标准aysncio模块可能缺少一些特性。
如果你浏览一下文档,你会发现有一个基于回调的API和一个基于协程的API。回调API可以用于构建UDP和TCP应用程序,而协程API看起来只能用于构建TCP应用程序,因为它使用了流样式的API。
这给我带来了一个问题,因为我正在为UDP网络寻找一个基于协程的API,尽管我确实发现asyncio支持低级的基于协程的套接字方法,如sock_recvsock_sendall,但UDP网络的关键API recvfromsendto不在那里。
我想做的是写一些代码,如:

async def handle_income_packet(sock):
    await data, addr = sock.recvfrom(4096)
    # data handling here...
    await sock.sendto(addr, response)

我知道使用回调API可以等效地实现这一点,但这里的问题是回调不是协程,而是常规函数,因此在其中您无法将控制权交还给事件循环并保留函数执行状态。
看看上面的代码,如果我们需要在数据处理部分做一些阻塞IO操作,只要我们的IO操作也在协程中完成,那么在协程版本中就不会有问题:

async def handle_income_packet(sock):
    await data, addr = sock.recvfrom(4096)
    async with aiohttp.ClientSession() as session:
        info = await session.get(...)
    response = generate_response_from_info(info)
    await sock.sendto(addr, response)

只要我们使用await,事件循环就会从那个点开始控制流来处理其他事情,直到IO完成为止。但遗憾的是,这些代码现在不可使用,因为我们在asyncio中没有socket.sendtosocket.recvfrom的协程版本。
我们可以使用传输协议回调API来实现这一点:

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        self.transport = transport

    def data_received(self, data):
        info = requests.get(...)
        response = generate_response_from_info(info)
        self.transport.write(response)
        self.transport.close()

我们不能在那里await一个协程,因为回调不是协程,并且使用像上面这样的阻塞IO调用将停止回调中的控制流,并阻止循环处理任何其他事件,直到IO完成
另一个推荐的实现方法是在data_received函数中创建一个Future对象,将其添加到事件循环中,并将任何需要的状态变量存储在Protocol类中,然后显式地将控制返回给循环。虽然这可以工作,但它确实会创建许多复杂的代码,而在协程版本中,这些代码根本不需要。
这里我们还有一个使用非阻塞套接字和add_reader来处理UDP套接字的例子。但是代码看起来仍然很复杂,与协程版本的几行代码相比。
我想说的是,协程是一个非常好的设计,它可以在一个线程中利用并发的强大功能,同时也有一个非常简单的设计模式,可以节省脑力和不必要的代码行,但我们的asyncio标准库中确实缺乏使其适用于UDP网络的关键部分。
你们觉得这个怎么样?
此外,如果有任何其他的建议,第三方库支持这种API的UDP网络,我将非常感谢我的课程项目的缘故.我发现Bluelet是很像这样的东西,但它似乎没有得到积极的维护.
编辑:
这个PR似乎实现了这个特性,但是被asyncio的开发者拒绝了,开发者声称所有的功能都可以用create_datagram_endpoint()这个协议传输API来实现,但是正如我上面所讨论的,在很多用例中,协程API比回调API更简单。很遗憾,我们没有UDP协议。

ao218c7q

ao218c7q1#

不提供基于流的API的原因是因为流在回调之上提供了 * 排序 *,而UDP通信本质上是无序的,因此两者根本不兼容。
但这并不意味着你不能从回调中调用协程--事实上这很容易!从EchoServerProtocol例子开始,你可以这样做:

def datagram_received(self, data, addr):
    loop = asyncio.get_event_loop()
    loop.create_task(self.handle_income_packet(data, addr))

async def handle_income_packet(self, data, addr):
    # echo back the message, but 2 seconds later
    await asyncio.sleep(2)
    self.transport.sendto(data, addr)

这里datagram_received启动了你的handle_income_packet协程,它可以自由地等待任意数量的协程。由于协程在“后台”运行,事件循环在任何时候都不会被阻塞,datagram_received会立即返回,就像预期的那样。

4bbkushb

4bbkushb2#

您可能对this module providing high-level UDP endpoints for asyncio感兴趣:

async def main():
    # Create a local UDP enpoint
    local = await open_local_endpoint('localhost', 8888)

    # Create a remote UDP enpoint, pointing to the first one
    remote = await open_remote_endpoint(*local.address)

    # The remote endpoint sends a datagram
    remote.send(b'Hey Hey, My My')

    # The local endpoint receives the datagram, along with the address
    data, address = await local.receive()

    # Print: Got 'Hey Hey, My My' from 127.0.0.1 port 50603
    print(f"Got {data!r} from {address[0]} port {address[1]}")
yacmzcpb

yacmzcpb3#

asyncudp提供了易于使用的异步UDP套接字。
下面是一个例子:

import asyncio
import asyncudp

async def main():
    sock = await asyncudp.create_socket(remote_addr=('127.0.0.1', 9999))
    sock.sendto(b'Hello!')
    print(await sock.recvfrom())
    sock.close()

asyncio.run(main())
brc7rcf0

brc7rcf04#

我想我会把我的解决方案发布给其他可能来自搜索引擎的人。当我在Python中学习异步网络编程时,我找不到UDP的异步API。我搜索了谷歌,想知道为什么会这样,最后发现了一个旧的邮件列表,上面有关于这个问题的帖子,以及Python的创建者认为这是一个坏主意。我不同意这种说法。
是的,UDP数据包的确是无序的,可能无法到达,但是没有技术上的原因为什么在UDP中不能有可等待的API来发送/接收/打开/关闭。所以我构建了一个库并添加了它。
下面是执行异步UDP的情况:首先,启动Python REPR并支持await:
Python3-m异步

from p2pd import *

# Load internal interface details.
netifaces = await init_p2pd()

# Load the default interface.
i = await Interface(netifaces=netifaces)

# Open a UDP echo client.
route = await i.route().bind()
dest = await Address("p2pd.net", 7, route)
pipe = await pipe_open(UDP, dest, route)

# Queue all responses.
pipe.subscribe()

# Send / recv.
await pipe.send(b"echo back this data", dest.tup)
out = await pipe.recv()
print(out)

# Cleanup.
await pipe.close()

我的库解决了很多问题。它正确地处理了接口管理、地址查找和NAT枚举。它使IPv6像IPv4一样容易使用。它为UDP / TCP / Server / Client提供了相同的API。它支持点对点连接。还有一个REST API可以从其他语言使用。
您可以在https://roberts.pm/p2pd阅读更多关于Python异步网络的问题,我的库中的文档在https://p2pd.readthedocs.io/en/latest/

相关问题