如何在合流kafka python中读取批处理消息?

lb3vh1jj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(324)

我试图阅读Kafka的信息,所以我写了一个简单的消费者阅读Kafka的信息。

While True:
        message = consumer.poll(timeout=1.0)
        # i am doing something with messages

在上面的代码中,消息类型的输出是消息对象。我如何获得一个消息数组?
有没有可能??
注意:没有太多的消费者配置只是基本的。

wribegjk

wribegjk1#

librdkafka(底层的c库)只向应用程序逐个返回消息,但在内部,消息是通过批处理从代理中获取的,因此没有性能下降。消息在内部缓冲区中排队,等待应用程序轮询。
有一些配置可以调整行为: fetch.wait.max.ms (默认值为100),指定给代理的时间,用于累积要发送的数据 fetch.message.max.bytes (默认为1048576,1gb),最大批量大小 queued.max.messages.kbytes (默认值1000000),内部队列中数据的最大大小。如果您不定期轮询,数据将不会从队列中清除,您将无法获取更多数据。
还有很多其他的你可以在这里找到:https://github.com/edenhill/librdkafka/blob/0.11.0.x/configuration.md
如果由于处理数据的方式而确实需要一个数据数组,那么您可以像这样在循环中以低超时调用poll,并在有x条消息或在y ms之后停止循环,将它们累积到一个集合中。处理生成的数组并重复循环。
生成也一样:您一个接一个地生成数据,但是消息在发送到代理之前是批处理的。

相关问题