storm不受控制的tuple multilikation

qq24tv8q  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(333)

我试图把Kafka的数据通过风暴在hdfs和Hive。我在和hortonworks一起工作。因此,我有以下结构,正如在许多教程中看到的(稍加修改)(http://henning.kropponline.de/2015/01/24/hive-streaming-with-storm/):

TopologyBuilder builder = new TopologyBuilder();

 builder.setSpout("kafka-spout", kafkaSpout);

 builder.setBolt("hdfs-bolt", hdfsBolt).globalGrouping("kafka-spout");

 builder.setBolt("parse-bolt", new ParseBolt()).globalGrouping("kafka-spout");

 builder.setBolt("hive-bolt", hiveBolt).globalGrouping("parse-bolt");

我将kafka喷口数据直接发送到hdfs-bolt,当我只使用hdfs-bolt时,它就工作了。当我添加parse bolt来解析kafka数据并将其发送到hivebolt时,整个系统都会变得疯狂。即使我只是通过Kafka发送一条消息,这个消息也会被Kafka喷口无限次地复制,并被无限次地写入hdfs。
如果parse bolt中有错误,hdfs bolt不应该仍然正常工作吗?我对这个主题不太熟悉,有人能看出初学者的一个简单错误吗?我很感激你的建议。

wnavrhmk

wnavrhmk1#

你是在两个博尔特执行完毕后确认消息吗?
当你从Kafka喷口读到同一个流时,消息将被锚定到同一个喷口,但是有唯一的消息ID。因此,基本上,即使您的parse bolt的元组失败,因为它被锚定到同一个喷口,它将在喷口处被重放。这将导致另一个元组具有不同的messageid,但为订阅它的所有bolt播放相同的内容,在您的示例中是parse bolt和hdfs bolt。请记住,重放发生在喷口,因此从喷口订阅到该流的所有内容都将获得冗余消息。

相关问题