我正在尝试实现github项目(https://github.com/tomatotomahto/cdh-sensor-analytics)通过cloudera data science workbench在我们的内部hadoop集群上。
在cloudera data science workbench上运行项目时,我在尝试通过python-api kafkaproducer(bootstrap\u servers='broker1:9092')连接到kafka时遇到错误“no brokers available”(代码可以在中找到)https://github.com/tomatotomahto/cdh-sensor-analytics/blob/master/datagenerator/kafkaconnection.py].
我已经使用kerberos进行了身份验证。我尝试过给代理节点不带端口号,也作为一个列表。但是,到目前为止还没有任何进展。
下面是堆栈跟踪。
NoBrokersAvailable: NoBrokersAvailable
NoBrokersAvailable Traceback (most recent call
last)
in engine
----> 1 dgen = DataGenerator(config)
/home/cdsw/datagenerator/DataGenerator.py in __init__(self, config)
39
40 self._kudu = KuduConnection(self._config['kudu_master'],
self._config['kudu_port'], spark)
---> 41 self._kafka =
KafkaConnection(self._config['kafka_brokers'],
self._config['kafka_topic'])
42
43 #self._kafka
/home/cdsw/datagenerator/KafkaConnection.py in __init__(self, brokers,
topic)
4 class KafkaConnection():
5 def __init__(self, brokers, topic):
----> 6 self._kafka_producer =
KafkaProducer(bootstrap_servers=brokers)
7 self._topic = topic
8
/home/cdsw/.local/lib/python3.6/site-packages/kafka/producer/kafka.py
in __init__(self,**configs)
333
334 client = KafkaClient(metrics=self._metrics,
metric_group_prefix='producer',
--> 335 **self.config)
336
337 # Get auto-discovered version from client if necessary
/home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in
__init__(self,**configs)
208 if self.config['api_version'] is None:
209 check_timeout =
self.config['api_version_auto_timeout_ms'] / 1000
--> 210 self.config['api_version'] =
self.check_version(timeout=check_timeout)
211
212 def _bootstrap(self, hosts):
/home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in
check_version(self, node_id, timeout, strict)
806 try_node = node_id or self.least_loaded_node()
807 if try_node is None:
--> 808 raise Errors.NoBrokersAvailable()
809 self._maybe_connect(try_node)
810 conn = self._conns[try_node]
NoBrokersAvailable: NoBrokersAvailable
我还尝试通过cli通过vpn连接在工作台之外。我也犯了同样的错误。
关于我遗漏了什么有什么建议吗?提前谢谢!
1条答案
按热度按时间wmtdaxz31#
第一步是确定网络路由是否打开,代理是否启动并侦听该端口。之后,您可以检查身份验证等。
你试过了吗
telnet <broker host> 9092
?您可能需要显式设置
advertised.listeners
除了listeners
,我偶尔会看到java有一个奇怪的怪癖,它没有绑定到预期的网络接口(或者至少是我预期的网络接口!)我不得不用advertised.listeners
.