我正试图阅读Kafka中最早/最古老的信息,这是以前没有读过的。
import json
from kafka import KafkaConsumer
from kafka import KafkaProducer
ORDER_KAFKA_TOPIC = "order_details"
ORDER_CONFIRMED_KAFKA_TOPIC = "order_confirmed"
consumer = KafkaConsumer(
ORDER_KAFKA_TOPIC,
bootstrap_servers="localhost:29092",
auto_offset_reset = "earliest",
enable_auto_commit=False,
)
print("Gonna start listening")
while True:
for message in consumer:
print("Ongoing transaction..")
consumed_message = json.loads(message.value.decode())
print(consumed_message)
我已经设置了参数:
auto_offset_reset = "earliest",
enable_auto_commit=False,
根据文件:
创建使用者时,enable_auto_commit选项设置为False。这意味着使用者不会自动提交其偏移量,因此它将仅接收其他使用者尚未提交的消息。这允许使用者仅读取主题中的未读消息。
但是,我仍然收到已阅读的消息。
1条答案
按热度按时间j8ag8udp1#
您需要为
auto.offset.reset
config指定一个使用者组id才能执行任何操作。另外,“未读”是一个用词不当,它应该是“未提交”* 在使用者组 * 中,例如,一个组可以读消息,但不能控制另一个组是否看到消息。
记录在使用时不会被删除。如果这是您想要的预期行为,那么要么Kafka不是正确的工具,要么您需要自己跟踪消息的唯一性。