我想为Python请求库添加一个websockets适配器,想知道是否有人已经实现了这个适配器?类似于HTTP适配器,但用于websockets。我正在尝试连接到默认情况下不使用requests库的WebSocket端点(ws://)。我希望添加一个适配器,该适配器可以使用requests库处理所有ws://和wss://协议
ws://
wss://
3htmauhk1#
a.)使用websockets库(如果你真的想的话,和requests一起)
websockets
requests
import asyncio import requests import websockets async def send_message(message): async with websockets.connect("ws://echo.websocket.org") as websocket: await websocket.send(message) response = await websocket.recv() return response loop = asyncio.get_event_loop() response = loop.run_until_complete(send_message("Hello, World!")) print(response)
B.)改用aiohttp
aiohttp
import aiohttp import asyncio async def send_message(message): async with aiohttp.ClientSession() as session: async with session.ws_connect("ws://echo.websocket.org") as ws: await ws.send_str(message) response = await ws.receive() return response.data loop = asyncio.get_event_loop() response = loop.run_until_complete(send_message("Hello, World!")) print(response)
c.)使用requests_html
requests_html
from requests_html import AsyncHTMLSession async def send_message(message): async with AsyncHTMLSession() as session: websocket = await session.ws_connect("ws://echo.websocket.org") await websocket.send_str(message) response = await websocket.receive() return response.text loop = asyncio.get_event_loop() response = loop.run_until_complete(send_message("Hello, World!")) print(response)
1条答案
按热度按时间3htmauhk1#
a.)使用
websockets
库(如果你真的想的话,和requests
一起)B.)改用
aiohttp
c.)使用
requests_html