同步运行faust代理

j8yoct9x  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(345)

检查以下代码

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        process(value)

代理在5秒内异步获取5000条记录并对其进行处理。我不希望代理在前一条记录处理完成之前再挑选50万条记录。基本上我想同步运行代理。有办法吗?

col17t5w

col17t5w1#

我尝试使用以下代码来查看工人是否正在执行第二批记录,而第一批记录的处理尚未完成

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        print(1)
        await async.sleep(30)

工人打印了 1 等了30秒才打印出来 2 . await语句将控制权返回给事件循环,但在本例中,它等待了,这意味着批处理是一个接一个地执行的。因此这是同步的。
提交偏移、重新平衡、监视等都是由事件循环处理的异步操作。

wbgh16ku

wbgh16ku2#

我认为可以在代理上将并发性设置为1,这样可以有效地使其同步。
如果这样做的话,您可能还会发现修改主题分区是有用的,但是我对这两个设置之间的关系没有完全的了解(只是想指出一个潜在的有用途径)。

相关问题