我试图用下面的代码将Kafka整合到StormTopology中,但不幸的是,Kafka并没有使用来自Kafka主题的消息。在storm ui核心,发射计数永远保持为0。
String bootStrapServer = "10.20.10.238:9092";
String topic = "test.topic";
KafkaSpoutConfig.Builder spoutConfigBuilder = KafkaSpoutConfig.builder(bootStrapServer,topic);
spoutConfigBuilder.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG,100*1024*1024);
spoutConfigBuilder.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,100*1024*1024);
spoutConfigBuilder.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
Boolean readFromStart = true;
if(readFromStart) {
spoutConfigBuilder.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST);
}
else {
spoutConfigBuilder.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST);
}
KafkaSpout spout = new KafkaSpout(spoutConfigBuilder.build());
builder.setSpout("kafkaSpout", spout, 1);
// And a Bolt to see messages
builder.setBolt("fcBolt", new FcBolt(), 1).setNumTasks(1).shuffleGrouping("kafkaSpout");
但是,当我尝试从cli查看生成的消息时,我可以使用以下命令查看有关主题的所有消息:
bin/kafka-console-consumer.sh --topic test.topic --from-beginning --bootstrap-server 10.20.10.238:9092
Picked up _JAVA_OPTIONS: -Xmx128000m
test
test
test1
....
版本:
Storm : 2.2.0
Kafka : 2.13_2.6.0
在旧版本中,它运行良好!一些我在新版本中没有读到的东西。
谢谢你的帮助。提前谢谢!
1条答案
按热度按时间tzdcorbm1#
很难知道你有什么,所以也考虑一下显示你的代码的其余部分。但从你所拥有的来看,它并不像你在制造任何事件。
如果您正试图使用您的喷口中的kafka事件进行进一步处理,那么请确保您实际订阅了一个主题,该主题上正在创建事件,并且您无法通过控制台使用者看到事件输出,因为您是在storm中使用它们,而不是生成它们。
如果您试图通过storm为测试主题生成kafka事件,然后试图通过控制台使用者使用它们,那么请确保您实际上是在storm中生成事件。
希望这能让你走上正确的道路,我建议你在这里回顾一下Kafka的基本概念:Kafka简介