重新启动Storm时,再次处理Kafka的所有预处理记录

uoifb46i  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(204)

我正在从Kafka consumer阅读数据到Storm spout。但是,当我重新启动Storm时,它也会从Kafka读取以前处理过的记录。在重新启动时,我不想处理以前处理过的记录。下面是我的代码:

public class KafkaStormSample {
    public static void main(String[] args) throws Exception {

        SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
        kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
        builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
    }
}
uidvcgyl

uidvcgyl1#

沿着静态UUID,您还可以使用StormSubmitter提交要在Storm集群上运行的拓扑。更多信息here

0s7z1bwu

0s7z1bwu2#

问题是你为SpoutConfig使用的随机UUID。相反,选择一个固定的字符串并每次使用它。
不相关:你不应该用storm-kafka来写新代码。用storm-kafka-client来代替。

相关问题