faust异步kafka消息处理并发不起作用

ecr0jaav  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(215)

目前,我正在尝试从kafka主题读取数据,并使用从kafka主题获取的数据异步调用restapi。这里restapi立即给出响应,如果msg是meher,否则响应将需要5秒
Kafka资料

Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher

代码如下:

app = faust.App(
    'faustApp',
    broker="kafka://localhost:9092",
    value_serializer='raw',
)

app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
    async for article in articles:
        val = article.decode('utf-8')
        url = 'http://0.0.0.0:5050/' + val
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
            print(data)
if __name__ == '__main__':
    app.main()

电流输出:

Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!

预期产量:

Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!

预期的是获得所有rest调用的响应,第一个是即时响应,第二个是延迟响应,但目前它是按顺序工作的。
如果我将并发性增加到5,它将提供预期的输出,但是在并发性为1的情况下应该使用相同的输出。不确定,如果我遗漏了什么…有什么帮助吗?
更新1:
我已经用普通的python asyncio尝试了同样的方法。它按预期工作

import asyncio
import aiofiles
import aiohttp

async def get_player(player_name):
    url = 'http://0.0.0.0:5050/' + player_name
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            data = await resp.text()
    print(data)

loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
    asyncio.gather(
        *(get_player(args) for args in player_args)
    )
)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题