我的初始代码使用 aiokafka
制片人客户。
import asyncio
import aiokafka
async def foo(loops):
producer = aiokafka.AIOKafkaProducer(
loop=loop, bootstrap_servers='localhost:9092'
)
await producer.start()
try:
for _ in range(loops):
await producer.send("my_topic", b"Hello world!")
finally:
await producer.stop()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(foo(1000))
然而,我认为 producer.send()
如果我们避免 await
在进入its的下一个迭代之前 for
循环。不幸的是,下面修改的代码给出了错误
runtimeerror:此事件循环已在运行。
我想第二个 loop.run_until_complete()
是造成这种情况的原因,但我不知道如何重写代码而不调用第二次。达到预期效果的正确方法是什么?
import asyncio
import aiokafka
async def foo(loops):
producer = aiokafka.AIOKafkaProducer(
loop=loop, bootstrap_servers='localhost:9092'
)
await producer.start()
try:
tasks = []
for _ in range(N):
tasks.append(producer.send("my_topic", b"Hello"))
loop.run_until_complete(asyncio.gather(*tasks))
finally:
await producer.stop()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(foo(1000))
暂无答案!
目前还没有任何答案,快来回答吧!