使用kafkaspout和storm,如何忽略旧消息?

tv6aics1  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(286)

出于调试目的,在启动拓扑时,我希望忽略启动前kafka队列中的所有消息。我相信这可以通过设置 spoutConfig.startOffsetTime 以及 spoutConfig.useStartOffsetTimeIfOffsetOutOfRange . 但是我试着把它们分别设置为-1、-2、-3和true/false的每一个排列。我的拓扑结构继续消耗Kafka的能量(没有任何东西会向Kafka(kafka)发布新消息。
有没有配置可以用来忽略旧消息?从本质上说,清除Kafka队列?

gblwokeq

gblwokeq1#

您是否正确配置了zookeeper?最后的偏移量可以存储在那里 KafkaSpout 启动时可以读取zookeeper的最后一个偏移量。有关更多详细信息,请查看此处:
https://storm.apache.org/documentation/storm-kafka.html
http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/

相关问题