kafka消费者?

evrscar2  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(422)

我正在做python kafka consumer(尝试在中使用kafka.consumer.simpleconsumer或kafka.consumer.simpleconsumer)http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html). 当我运行下面的代码时,它将一直运行,即使所有消息都已被消耗。我希望消费者会停止,如果它消费所有的信息。怎么做?另外,我也不知道如何使用stop()函数(在基类kafka.consumer.base.consumer中)。
更新
我使用信号处理程序调用consumer.stop()。一些错误信息被打印到屏幕上。但是这个程序仍然停留在for循环中。当新的信息进来时,消费者就把它们消费掉并打印出来。我还尝试了client.close()。但结果是一样的。
我需要一些方法来优雅地停止for循环。

client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value

欢迎任何帮助。谢谢。

ht4b089n

ht4b089n1#

与mohit的答案类似,但使用 end_offsets 消费者的功能。

from kafka import KafkaConsumer, TopicPartition

# settings

client = "localhost:9092"
topic = 'test'

# prepare consumer

tp = TopicPartition(topic,0)
consumer = KafkaConsumer(client)
consumer.assign([tp])
consumer.seek_to_beginning(tp)  

# obtain the last offset value

lastOffset = consumer.end_offsets([tp])[tp]

for message in consumer:
    print "Offset:", message.offset
    print "Value:", message.message.value
    if message.offset == lastOffset - 1:
        break
xbp102n0

xbp102n02#

我们可以先检查主题中最后一条消息的偏移量。当我们到达偏移量时,停止循环。

client = "localhost:9092"
    consumer = KafkaConsumer(client)
    topic = 'test'
    tp = TopicPartition(topic,0)
    #register to the topic
    consumer.assign([tp])

    # obtain the last offset value
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)

    consumer.seek_to_beginning(tp)        

    for message in consumer:
        print "Offset:", message.offset
        print "Value:", message.message.value
        if message.offset == lastOffset - 1:
            break
vaj7vani

vaj7vani3#

使用iter\u timeout参数设置等待时间。如果设置为10,就像下面的代码一样,如果10秒内没有新消息出现,它将退出。默认值为none,这意味着即使没有新消息传入,使用者也会在此处阻止。

self.consumer = SimpleConsumer(self.client, "test-group", "test",
                iter_timeout=10)

更新
以上不是一个好方法。当大量消息传入时,很难设置足够小的iter\u超时来保证停止。所以,现在,我使用get\u message()函数,它尝试使用一条消息并停止。如果没有新消息,则返回none。

相关问题