我目前正在开发StormVersionUpProject0.9.6->1.0.2我的喷口没有从最新的偏移量开始读取,即使在喷口配置构造函数中使用相同的喷口id。哦,我没有删除zookeeper数据,只是删除了storm数据。
我改变了我的项目配置和源代码如下1。storm core和storm kafka版本将pom.xml中的0.9.6更改为1.0.2,kafkaè2.10更改为0.8.2.2。2更改包路径-backtype->org.apache-storm.kafka->org.apache.storm.kafka 3。在自定义方案中将序列化代码byte[]更改为bytebuffer(我使用了avro类)
我开始拓扑和我的喷口消费者开始读取特定的偏移值。我的拓扑读取这样的数据,即使重新启动拓扑,每次重新启动都会消耗相同的数据。
数据1->数据2->数据3(重新启动拓扑)数据1->数据2->数据3
在更改版本之前,像这样正确地使用下一个数据,即使重新启动拓扑。数据1->数据2->数据3(重启拓扑)数据4->数据5->数据6(重启拓扑)数据7->数据8->数据9
我没有更改spoutconfig中的zkroot值(我认为这意味着kafka组id)如何解决这个问题?
谢谢和问候
1条答案
按热度按时间iaqfqrcu1#
选中spoutconfig.startoffsetime它可以是kafka.api.offsetrequest.latesttime()或kafka.api.offsetrequest.earliesttime()