Python Kafka只读一次消息

0tdrvxhp  于 2022-12-17  发布在  Apache
关注(0)|答案(1)|浏览(167)

我正试图阅读Kafka中最早/最古老的信息,这是以前没有读过的。

import json

from kafka import KafkaConsumer
from kafka import KafkaProducer

ORDER_KAFKA_TOPIC = "order_details"
ORDER_CONFIRMED_KAFKA_TOPIC = "order_confirmed"

consumer = KafkaConsumer(
    ORDER_KAFKA_TOPIC,
    bootstrap_servers="localhost:29092",
    auto_offset_reset = "earliest",
    enable_auto_commit=False,

)

print("Gonna start listening")
while True:
    for message in consumer:

        print("Ongoing transaction..")
        consumed_message = json.loads(message.value.decode())
        print(consumed_message)

我已经设置了参数:

auto_offset_reset = "earliest",
    enable_auto_commit=False,

根据文件:
创建使用者时,enable_auto_commit选项设置为False。这意味着使用者不会自动提交其偏移量,因此它将仅接收其他使用者尚未提交的消息。这允许使用者仅读取主题中的未读消息。
但是,我仍然收到已阅读的消息。

j8ag8udp

j8ag8udp1#

您需要为auto.offset.reset config指定一个使用者组id才能执行任何操作。
另外,“未读”是一个用词不当,它应该是“未提交”* 在使用者组 * 中,例如,一个组可以读消息,但不能控制另一个组是否看到消息。
记录在使用时不会被删除。如果这是您想要的预期行为,那么要么Kafka不是正确的工具,要么您需要自己跟踪消息的唯一性。

相关问题