使用Kafka0.8.1.1及其各自的Zookeeper和Kafka风暴的当前代码https://github.com/apache/incubator-storm(上次提交:7ac24b893c)我有以下错误:
如果我创建两个主题(topica和topicb)并将数据发送到topica(将topica的偏移量更改为offseta)并为topica打开一个kafka喷口,那么一切都正常。但是,如果我为topicb打开一个喷口,喷口将检索topica的偏移量,从而导致与此错误偏移量相关的异常(offseta而不是offsetb)。
检查代码:https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/partitionmanager.java#l103,正在检索正确的offsetb,但没有使用此变量,而是使用在json中找到的偏移量。这个json可以在/id/partition\u 0(一个zookeeper url)中找到,其中存储了两个主题。
如果我从zookeeper获得分区0的数据,我会收到以下信息(为了可读性而美化):
$ get /id/partition_0
{
"topology": {
"id": "58e1c603-0ab0-4344-a614-204e4455e90a",
"name": "Start test storm"
},
"offset": 1,
"partition": 0,
"broker": {
"host": "XXX.XXX.XXX.XXX.compute.amazonaws.com",
"port": 9092
},
"topic": "topicA"
}
...
如你所见,是给topica的偏移量。这是我不理解的行为:如果我从两个主题的相同url(/id/partition\u 0)检索偏移量,很明显,对于某些主题,会检索到错误的偏移量。在我的Kafka配置中有什么配置错误吗?是其他原因导致这个错误吗?
我修改了该类(partitionmanager构造函数)中的代码,以便忽略/id/partition 0中的json,并使用kafkautils.getoffset(\u consumer,spoutconfig.topic,id.partition,spoutconfig)中的偏移量;我的数据流运行正常。
如果有帮助的话,我还将为这两个主题添加zookeeper信息。
$ get /brokers/topics/topicA
{"version":1,"partitions":{"0":[0]}}
...
$ get /brokers/topics/topicB
{"version":1,"partitions":{"0":[0]}}
...
暂无答案!
目前还没有任何答案,快来回答吧!