尝试通过cloudera data science workbench连接到kafka时没有可用的代理

vuv7lop3  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(314)

我正在尝试实现github项目(https://github.com/tomatotomahto/cdh-sensor-analytics)通过cloudera data science workbench在我们的内部hadoop集群上。
在cloudera data science workbench上运行项目时,我在尝试通过python-api kafkaproducer(bootstrap\u servers='broker1:9092')连接到kafka时遇到错误“no brokers available”(代码可以在中找到)https://github.com/tomatotomahto/cdh-sensor-analytics/blob/master/datagenerator/kafkaconnection.py].
我已经使用kerberos进行了身份验证。我尝试过给代理节点不带端口号,也作为一个列表。但是,到目前为止还没有任何进展。
下面是堆栈跟踪。

NoBrokersAvailable: NoBrokersAvailable
NoBrokersAvailable                        Traceback (most recent call 
last)
in engine
----> 1 dgen = DataGenerator(config)

/home/cdsw/datagenerator/DataGenerator.py in __init__(self, config)
 39         
 40         self._kudu = KuduConnection(self._config['kudu_master'], 
self._config['kudu_port'], spark)
---> 41         self._kafka = 
KafkaConnection(self._config['kafka_brokers'], 
self._config['kafka_topic'])
 42 
 43         #self._kafka

/home/cdsw/datagenerator/KafkaConnection.py in __init__(self, brokers, 
topic)
  4 class KafkaConnection():
  5   def __init__(self, brokers, topic):
----> 6     self._kafka_producer = 
KafkaProducer(bootstrap_servers=brokers)
  7     self._topic = topic
  8     

/home/cdsw/.local/lib/python3.6/site-packages/kafka/producer/kafka.py 
in __init__(self,**configs)
333 
334         client = KafkaClient(metrics=self._metrics, 
metric_group_prefix='producer',
--> 335                            **self.config)
336 
337         # Get auto-discovered version from client if necessary

/home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in 
__init__(self,**configs)
208         if self.config['api_version'] is None:
209             check_timeout = 
self.config['api_version_auto_timeout_ms'] / 1000
--> 210             self.config['api_version'] = 
self.check_version(timeout=check_timeout)
211 
212     def _bootstrap(self, hosts):

/home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in 
check_version(self, node_id, timeout, strict)
806             try_node = node_id or self.least_loaded_node()
807             if try_node is None:
--> 808                 raise Errors.NoBrokersAvailable()
809             self._maybe_connect(try_node)
810             conn = self._conns[try_node]

NoBrokersAvailable: NoBrokersAvailable

我还尝试通过cli通过vpn连接在工作台之外。我也犯了同样的错误。
关于我遗漏了什么有什么建议吗?提前谢谢!

wmtdaxz3

wmtdaxz31#

第一步是确定网络路由是否打开,代理是否启动并侦听该端口。之后,您可以检查身份验证等。
你试过了吗 telnet <broker host> 9092 ?
您可能需要显式设置 advertised.listeners 除了 listeners ,我偶尔会看到java有一个奇怪的怪癖,它没有绑定到预期的网络接口(或者至少是我预期的网络接口!)我不得不用 advertised.listeners .

相关问题