检查以下代码
@app.agent() async def process(stream): async for value in stream.take(5000, within=5): process(value)
代理在5秒内异步获取5000条记录并对其进行处理。我不希望代理在前一条记录处理完成之前再挑选50万条记录。基本上我想同步运行代理。有办法吗?
col17t5w1#
我尝试使用以下代码来查看工人是否正在执行第二批记录,而第一批记录的处理尚未完成
@app.agent() async def process(stream): async for value in stream.take(5000, within=5): print(1) await async.sleep(30)
工人打印了 1 等了30秒才打印出来 2 . await语句将控制权返回给事件循环,但在本例中,它等待了,这意味着批处理是一个接一个地执行的。因此这是同步的。提交偏移、重新平衡、监视等都是由事件循环处理的异步操作。
1
2
wbgh16ku2#
我认为可以在代理上将并发性设置为1,这样可以有效地使其同步。如果这样做的话,您可能还会发现修改主题分区是有用的,但是我对这两个设置之间的关系没有完全的了解(只是想指出一个潜在的有用途径)。
2条答案
按热度按时间col17t5w1#
我尝试使用以下代码来查看工人是否正在执行第二批记录,而第一批记录的处理尚未完成
工人打印了
1
等了30秒才打印出来2
. await语句将控制权返回给事件循环,但在本例中,它等待了,这意味着批处理是一个接一个地执行的。因此这是同步的。提交偏移、重新平衡、监视等都是由事件循环处理的异步操作。
wbgh16ku2#
我认为可以在代理上将并发性设置为1,这样可以有效地使其同步。
如果这样做的话,您可能还会发现修改主题分区是有用的,但是我对这两个设置之间的关系没有完全的了解(只是想指出一个潜在的有用途径)。