我试图阅读Kafka的信息,所以我写了一个简单的消费者阅读Kafka的信息。
While True: message = consumer.poll(timeout=1.0) # i am doing something with messages
在上面的代码中,消息类型的输出是消息对象。我如何获得一个消息数组?有没有可能??注意:没有太多的消费者配置只是基本的。
wribegjk1#
librdkafka(底层的c库)只向应用程序逐个返回消息,但在内部,消息是通过批处理从代理中获取的,因此没有性能下降。消息在内部缓冲区中排队,等待应用程序轮询。有一些配置可以调整行为: fetch.wait.max.ms (默认值为100),指定给代理的时间,用于累积要发送的数据 fetch.message.max.bytes (默认为1048576,1gb),最大批量大小 queued.max.messages.kbytes (默认值1000000),内部队列中数据的最大大小。如果您不定期轮询,数据将不会从队列中清除,您将无法获取更多数据。还有很多其他的你可以在这里找到:https://github.com/edenhill/librdkafka/blob/0.11.0.x/configuration.md如果由于处理数据的方式而确实需要一个数据数组,那么您可以像这样在循环中以低超时调用poll,并在有x条消息或在y ms之后停止循环,将它们累积到一个集合中。处理生成的数组并重复循环。生成也一样:您一个接一个地生成数据,但是消息在发送到代理之前是批处理的。
fetch.wait.max.ms
fetch.message.max.bytes
queued.max.messages.kbytes
1条答案
按热度按时间wribegjk1#
librdkafka(底层的c库)只向应用程序逐个返回消息,但在内部,消息是通过批处理从代理中获取的,因此没有性能下降。消息在内部缓冲区中排队,等待应用程序轮询。
有一些配置可以调整行为:
fetch.wait.max.ms
(默认值为100),指定给代理的时间,用于累积要发送的数据fetch.message.max.bytes
(默认为1048576,1gb),最大批量大小queued.max.messages.kbytes
(默认值1000000),内部队列中数据的最大大小。如果您不定期轮询,数据将不会从队列中清除,您将无法获取更多数据。还有很多其他的你可以在这里找到:https://github.com/edenhill/librdkafka/blob/0.11.0.x/configuration.md
如果由于处理数据的方式而确实需要一个数据数组,那么您可以像这样在循环中以低超时调用poll,并在有x条消息或在y ms之后停止循环,将它们累积到一个集合中。处理生成的数组并重复循环。
生成也一样:您一个接一个地生成数据,但是消息在发送到代理之前是批处理的。