我试图把Kafka的数据通过风暴在hdfs和Hive。我在和hortonworks一起工作。因此,我有以下结构,正如在许多教程中看到的(稍加修改)(http://henning.kropponline.de/2015/01/24/hive-streaming-with-storm/):
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout);
builder.setBolt("hdfs-bolt", hdfsBolt).globalGrouping("kafka-spout");
builder.setBolt("parse-bolt", new ParseBolt()).globalGrouping("kafka-spout");
builder.setBolt("hive-bolt", hiveBolt).globalGrouping("parse-bolt");
我将kafka喷口数据直接发送到hdfs-bolt,当我只使用hdfs-bolt时,它就工作了。当我添加parse bolt来解析kafka数据并将其发送到hivebolt时,整个系统都会变得疯狂。即使我只是通过Kafka发送一条消息,这个消息也会被Kafka喷口无限次地复制,并被无限次地写入hdfs。
如果parse bolt中有错误,hdfs bolt不应该仍然正常工作吗?我对这个主题不太熟悉,有人能看出初学者的一个简单错误吗?我很感激你的建议。
1条答案
按热度按时间wnavrhmk1#
你是在两个博尔特执行完毕后确认消息吗?
当你从Kafka喷口读到同一个流时,消息将被锚定到同一个喷口,但是有唯一的消息ID。因此,基本上,即使您的parse bolt的元组失败,因为它被锚定到同一个喷口,它将在喷口处被重放。这将导致另一个元组具有不同的messageid,但为订阅它的所有bolt播放相同的内容,在您的示例中是parse bolt和hdfs bolt。请记住,重放发生在喷口,因此从喷口订阅到该流的所有内容都将获得冗余消息。