我正在尝试使用Kafka to Storm来模拟流流量。我使用KafkaSpout来读取一个主题中的消息,该消息是由读取这些推文并将其发送到一个主题的Producer发送的。我的问题是,拓扑消耗了此主题中发送的所有推文后,它会继续读取该主题中的消息两次。我如何阻止KafkaSpout阅读两次?(复制因子设置为1)
dz6r00yl1#
我觉得配置没问题。也许问题出在双重 Package 上。确保在execute中每个元组只 Package 一次。正如评论中提到的,请考虑升级到较新的Kafka版本,以及切换到storm-kafka-client。还有一件事可能会让你的生活轻松一点:考虑扩展BaseBasicBolt而不是BaseRichBolt。如果运行execute没有抛出错误,BaseBasicBolt会自动为您确认元组。如果您想使元组失败,可以抛出FailedException。BaseRichBolt应该只在您想做更复杂的确认时使用,例如,在确认之前从内存中的许多execute调用中聚合元组。
execute
storm-kafka-client
BaseBasicBolt
BaseRichBolt
FailedException
1条答案
按热度按时间dz6r00yl1#
我觉得配置没问题。
也许问题出在双重 Package 上。确保在
execute
中每个元组只 Package 一次。正如评论中提到的,请考虑升级到较新的Kafka版本,以及切换到
storm-kafka-client
。还有一件事可能会让你的生活轻松一点:考虑扩展
BaseBasicBolt
而不是BaseRichBolt
。如果运行execute
没有抛出错误,BaseBasicBolt
会自动为您确认元组。如果您想使元组失败,可以抛出FailedException
。BaseRichBolt
应该只在您想做更复杂的确认时使用,例如,在确认之前从内存中的许多execute
调用中聚合元组。