kafka python消费者未收到消息

bxjv4tth  于 2021-06-07  发布在  Kafka
关注(0)|答案(5)|浏览(1029)

我在这方面有困难 KafaConsumer 使其从开头读取,或从任何其他显式偏移量读取。
为同一主题的使用者运行命令行工具时,我确实看到了 --from-beginning 选项,否则将挂起

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning

如果我通过python运行它,它会挂起,我怀疑这是由不正确的使用者配置引起的

consumer = KafkaConsumer(topic_name,
                     bootstrap_servers=['localhost:9092'],
                     group_id=None,
                     auto_commit_enable=False,
                     auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
    print "Message", message
    if message is not None:
        print message.offset, message.value

print "Quit"

输出:

使用来自给定主题的消息(之后挂起)
我使用的是kafkapython0.9.5,代理运行的是kafka8.2。不知道到底是什么问题。
按照dpkp的建议设置\u group \u id=none \u以模拟控制台使用者的行为。

uplii1fm

uplii1fm1#

自动补偿重置为我解决了。

sd2nnvve

sd2nnvve2#

我的观点是:打印并确保偏移量是您所期望的。通过使用 position() 以及 seek_to_beginning() ,请参见代码中的注解。
我无法解释:
为什么示例化之后 KafkaConsumer ,分区没有分配,这是设计的吗?闲逛就是打电话 poll() 以前一次 seek_to_beginning() 为什么有时候之后呢 seek_to_beginning() ,第一次呼叫 poll() 不返回任何数据,也不更改偏移量。
代码:

import kafka
print(kafka.__version__)
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
KAFKA_URL = 'localhost:9092' # kafka broker
KAFKA_TOPIC = 'sida3_sdtest_topic' # topic name

# ASSUMING THAT the topic exist

# write to the topic

producer = KafkaProducer(bootstrap_servers=[KAFKA_URL])
for i in range(20):
    producer.send(KAFKA_TOPIC, ('msg' + str(i)).encode() )
producer.flush()

# read from the topic

# auto_offset_reset='earliest', # auto_offset_reset is needed when offset is not found, it's NOT what we need here

consumer = KafkaConsumer(KAFKA_TOPIC,
bootstrap_servers=[KAFKA_URL],
max_poll_records=2,
group_id='sida3'
)

# (!?) wtf, why we need this to get partitions assigned

# AssertionError: No partitions are currently assigned if poll() is not called

consumer.poll()
consumer.seek_to_beginning()

# also AssertionError: No partitions are currently assigned if poll() is not called

print('partitions of the topic: ',consumer.partitions_for_topic(KAFKA_TOPIC))

from kafka import TopicPartition
print('before poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

# (!?) sometimes the first call to poll() returns nothing and doesnt change the offset

messages = consumer.poll()
sleep(1)
messages = consumer.poll()

print('after poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

print('messages: ', messages)

输出:

2.0.1
partitions of the topic:  {0, 1}
before poll() x2: 
0
0
after poll() x2: 
0
2
messages:  {TopicPartition(topic='sida3_sdtest_topic', partition=1): [ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=0, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1), ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=1, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)]}
xwmevbvl

xwmevbvl3#

我遇到了同样的问题:我可以在kafka控制台中接收消息,但是无法使用包获取python脚本的消息 kafka-python .
最后我想原因是我没有打电话 producer.flush() 以及 producer.close() 在我的 producer.py 文件中没有提到。

ar5n3qh5

ar5n3qh54#

auto_offset_reset='earliest' 以及 group_id=None 帮我解决了。

rta7y2nd

rta7y2nd5#

控制台使用者和您发布的python使用者代码之间的区别是python使用者使用使用者组来保存偏移: group_id="test-consumer-group" . 相反,如果您设置group_id=none,您将看到与控制台使用者相同的行为。

相关问题