使用python的kafka消费者投票消息

pkmbmrz7  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(419)

我在一个消费者群体中对Kafka的信息进行投票时遇到了问题。我的使用者对象分配给给定的分区

self.ps = TopicPartition(topic, partition )

在此之后,使用者将分配给该分区:

self.consumer.assign([self.ps])

之后,我可以用

self.consumer.seek_to_beginning(self.ps)
pos = self.consumer.position(self.ps)

以及 self.consumer.seek_to_end(self.ps) .....
在我的主题中有超过30000条信息。问题是我只收到一条信息。
消费者配置:
max_poll_records= 200 AUTO_OFFSET_RESET 是最早的
这是我的功能,我试图得到以下信息:

def poll_messages(self):

    data = []

    messages = self.consumer.poll(timeout_ms=6000)

    for partition, msgs in six.iteritems(messages):

        for msg in msgs:

            data.append(msg)

    return data

即使在开始轮询消息之前转到第一个可用的偏移量,我也只得到一条消息。

self.consumer.seek(self.ps, self.get_first_offset())

我希望有人能解释我做错了什么。提前谢谢。
致以最良好的祝愿jö注册护士

4szc88ey

4szc88ey1#

我相信你误解了最高民意调查记录-这并不意味着你将得到200次民意调查,只是对你可能得到最多的限制。您需要多次致电poll。我想让您参考文档中的简单示例:http://kafka-python.readthedocs.io/en/master/usage.html
我认为更标准的实施方式是:

for message in self.consumer:
  # do stuff like:
  print(msg)

相关问题