python—有时一个新的消费群体不起作用

uqjltbpv  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(298)

我在生产中见过一次(我不记得我们是如何解决的),现在我可以在集成测试中重复这一点,集成测试总是从全新的kafka安装开始。事情是这样的:
步骤1:尚未存在的组的使用者订阅尚未存在的主题并开始轮询。

self.kafka_consumer = confluent_kafka.Consumer({
    'group.id': 'mygroup',
    'bootstrap.servers': 'kafka:9092',
    'enable.auto.commit': False,
    'auto.offset.reset': 'earliest',
})
self.kafka_consumer.subscribe('mytopic')

第二步:制作人向主题写一条消息。
结果:
大约一半的时间它工作良好;消费者正确阅读信息。
另一半时间消费者似乎陷入困境。我试过等上10分钟,看看它是否会脱钩,但没有。
即使这两个步骤是相反的,即消费者试图订阅一个已经存在的主题,该主题已经有了一条消息,但行为是相同的(但是组总是新的)。
更多细节
消费者正在轮询,超时时间为2秒,如果没有结果,它将循环。
虽然这个主题不存在, poll() 退货 None . 主题存在后, poll() 返回一个 msg 谁的 error().code()_PARTITION_EOF .
当消费者似乎被困住的时候,我问Kafka这是怎么回事 mygroup ,它告诉我:

root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
root@e7b124b4039c:/#

我试着把另一个不存在的主题读成 mygroup :

root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group mygroup --topic nonexistent --from-beginning
[2018-03-15 16:36:59,369] WARN [Consumer clientId=consumer-1, groupId=pixelprocessor] Error while fetching metadata with correlation id 2 : {nonexistent=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
^CProcessed a total of 0 messages
root@e7b124b4039c:/#

在我这么做之后,Kafka要说的是 mygroup :

root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
mytopic                        0          -               1               -          rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57      /172.20.0.6                    rdkafka
(another topic)                0          -               0               -          rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57      /172.20.0.6                    rdkafka
(a third topic)                0          -               0               -          rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57      /172.20.0.6                    rdkafka
nonexistent                    0          0               0               0          -                                                 -                              -

这是Kafka1.0.1,librdkafka 0.11.3,合流Kafka0.11.0,在ubuntu16.04 dockers上(带有操作系统打包的zookeeper 3.4.8),运行在debian stretch(9.4)和linux 4.9.0-6-amd64上。

yk9xbfzb

yk9xbfzb1#

问题似乎出在 Consumer() 论据。这不能正常工作:

self.kafka_consumer = confluent_kafka.Consumer({
    'group.id': 'mygroup',
    'bootstrap.servers': 'kafka:9092',
    'auto.offset.reset': 'earliest',
})

但事实上:

self.kafka_consumer = confluent_kafka.Consumer({
    'group.id': 'mygroup',
    'bootstrap.servers': 'kafka:9092',
    'default.topic.config': {
        'auto.offset.reset': 'earliest',
    },
})

相关问题