我使用以下Kafka设置:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "10.30.3.41:9092,10.30.3.42:9092,10.30.3.43:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "123",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
所有kafka代理都定义了它们相应的ip地址(如上所示)。
但是,在启动流式处理上下文时,出现以下错误:
16/12/31 01:46:06 DEBUG NetworkClient: Error connecting to node 1 at broker1:9092:
java.io.IOException: Can't resolve address: broker1:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:171)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:48)
...
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
... 31 more
broker1是我的代理的主机名。由于我没有在群集中设置dns,因此无法从所有节点解析此名称。我可以通过在服务器上正确添加所有代理主机名来解决这个问题 /etc/hosts
跨越所有节点。不幸的是,我真的不想管理 /etc/hosts
,我真的很想理解为什么spark不只是通过它们的ip地址连接到代理,因为我在下面显式地列出了它们 bootstrap.servers
.
1条答案
按热度按时间vshtjzan1#
我相信这是一个更多的问题与您的Kafka配置比Spark。可能
listeners
以及advertised.listeners
未设置或配置为使用主机名。如果真的是这样的话,这些价值观就会向消费者宣传,并导致观察到的行为。将代理配置为使用这些属性的ip地址可以解决以下问题: