目前,我正在尝试从kafka主题读取数据,并使用从kafka主题获取的数据异步调用restapi。这里restapi立即给出响应,如果msg是meher,否则响应将需要5秒
Kafka资料
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
代码如下:
app = faust.App(
'faustApp',
broker="kafka://localhost:9092",
value_serializer='raw',
)
app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
async for article in articles:
val = article.decode('utf-8')
url = 'http://0.0.0.0:5050/' + val
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(data)
if __name__ == '__main__':
app.main()
电流输出:
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
预期产量:
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
预期的是获得所有rest调用的响应,第一个是即时响应,第二个是延迟响应,但目前它是按顺序工作的。
如果我将并发性增加到5,它将提供预期的输出,但是在并发性为1的情况下应该使用相同的输出。不确定,如果我遗漏了什么…有什么帮助吗?
更新1:
我已经用普通的python asyncio尝试了同样的方法。它按预期工作
import asyncio
import aiofiles
import aiohttp
async def get_player(player_name):
url = 'http://0.0.0.0:5050/' + player_name
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.text()
print(data)
loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
asyncio.gather(
*(get_player(args) for args in player_args)
)
)
暂无答案!
目前还没有任何答案,快来回答吧!