无法使用trollius在python2.7的异步线程中处理kafka使用者消息

hxzsmxv2  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(242)

我是新来Kafka,所以如果我错过了什么,请让我知道?用例如下所示,有三个kafka使用者正在运行,从中读取的消息经过处理并通过post调用发送到客户端api,但是客户端所花费的时间(对于测试,我使用了8分钟)不是固定的,因此它可以大于max.poll.timeout(对于本例,我使用了6分钟)。为此,我尝试在客户机发回响应时暂停()消费者,然后使用poll()调用resume()从kafka代理获取下一条消息。我尝试用下面的代码在异步线程中进行处理,但是这不是异步执行的,重新平衡正在发生,然后所有的使用者都离开了组。我如何处理这个用例,为什么代码不能异步执行?
版本:
python 2.7版
Kafkapython 1.4.6
巨魔2.2.1

import trollius as asyncio
from trollius import From

for message in consumer:
    consumer.pause(*consumer.assignment())
    loop = asyncio.get_event_loop()
    tasks = [
        asyncio.ensure_future(thread_message_processor(consumer)),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    .....

@asyncio.coroutine
def thread_message_processor(consumer):
    # for testing I am using sleep as time taken by client API for processing
    yield From(asyncio.sleep(480))
    consumer.resume(*consumer.assignment())

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题