版本:
"org.apache.storm" % "storm-kafka-client" % "1.2.1"
"org.apache.storm" % "storm-core" % "1.2.1" % "compile"
Kafka: 0.10.1.0
我在本地集群中从kafka喷口收到以下错误/警告:
2018-06-28 00:00:34,930 AppInfoParser [INFO] Kafka version : 0.10.1.0
2018-06-28 00:00:34,930 AppInfoParser [INFO] Kafka commitId : 3402a74efb23d1d4
2018-06-28 00:00:34,931 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip1:9092 disconnected
2018-06-28 00:00:35,092 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip2:9092 disconnected
2018-06-28 00:00:35,251 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip3:9092 disconnected
2018-06-28 00:00:35,524 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip4:9092 disconnected
2018-06-28 00:00:35,629 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip5:9092 disconnected
2018-06-28 00:00:35,822 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip6:9092 disconnected
2018-06-28 00:00:35,927 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip7:9092 disconnected
以下是Kafka喷口的代码:
private def getKafkaSpoutConfig(source: TopologyConfig) = {
System.clearProperty("java.security.auth.login.config") //tried this after getting error, no impact
KafkaSpoutConfig.builder("ip1:9092,ip2:9092,ip3:9092,.....,ip10:9092", "topicName")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setOffsetCommitPeriodMs(100)
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
.setMaxUncommittedOffsets(1000000)
.build()
}
def getKafkaSpout(source: TopologyConfig) = new KafkaSpout(getKafkaSpoutConfig(source: TopologyConfig))
在调试时,我看到以下是此错误的堆栈跟踪:
maybeHandleDisconnection:568, NetworkClient$DefaultMetadataUpdater (org.apache.kafka.clients)
processDisconnection:396, NetworkClient (org.apache.kafka.clients)
handleDisconnections:464, NetworkClient (org.apache.kafka.clients)
poll:270, NetworkClient (org.apache.kafka.clients)
poll:232, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
poll:195, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
getTopicMetadata:253, Fetcher (org.apache.kafka.clients.consumer.internals)
partitionsFor:1318, KafkaConsumer (org.apache.kafka.clients.consumer)
getFilteredTopicPartitions:57, NamedTopicFilter (org.apache.storm.kafka.spout)
refreshAssignment:54, ManualPartitionSubscription (org.apache.storm.kafka.spout)
subscribe:49, ManualPartitionSubscription (org.apache.storm.kafka.spout)
subscribeKafkaConsumer:657, KafkaSpout (org.apache.storm.kafka.spout)
activate:648, KafkaSpout (org.apache.storm.kafka.spout)
invoke:484, util$async_loop$fn__557 (org.apache.storm)
run:22, AFn (clojure.lang)
run:748, Thread (java.lang)
相同的代码在一个Kafka设置中工作,但是对于同一版本的另一个Kafka设置,它开始给出上述错误。
编辑:
正如par所说,我试图连接到Kafka的9092端口,我能够做到:
➜ git:(myBranch) ✗ telnet ipn 9092
Trying ipn...
Connected to my-kafka-app-396433.
Escape character is '^]'.
1条答案
按热度按时间9rbhqvlz1#
这是因为Kafka的某些版本不匹配。安装的Kafka版本是
0.10.0.1
当代码在kafka客户端版本中选取和执行时:0.10.1.0
.它发生在
storm-core
依赖于kafka-clients
版本:0.10.1.0
,它可以被覆盖,我做了,但不知怎么的,它没有被正确地排除在sbt中。经过一些排列后,它开始工作,最终的依赖关系如下所示: