我对Kafka和多克完全陌生,有一个问题要解决。我们针对kafka(apache)队列的持续集成测试在本地计算机上运行良好,但在jenkins ci服务器上时,偶尔会出现以下错误:
%3|1508247800.270|FAIL|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: 1/1 brokers are down
工作原理是docker的形象需要时间来开始,到那时Kafka制作人已经放弃了。违规代码是
producer_properties = {
'bootstrap.servers': self._job_queue.bootstrap_server,
'client.id': self._job_queue.client_id
}
try:
self._producer = kafka.Producer(**producer_properties)
except:
print("Bang!")
上面的错误行出现在生产者的创建中。但是,不会引发异常,并且调用会返回一个外观有效的生产者,因此我无法以编程方式测试代理端点的存在性。是否有用于检查代理状态的api?
2条答案
按热度按时间jgwigjjp1#
这是似乎对我有用的代码。如果它看起来有点像弗兰肯斯坦,那么你是对的,它是!如果有一个干净的解决方案,我期待着看到它:
bn31dyow2#
如果与代理的连接失败,客户端似乎不会抛出异常。实际上,当生产者第一次尝试发送消息时,它会尝试连接到引导服务器。如果连接失败,它会反复尝试连接到引导列表中传递的任何代理。最后,如果代理出现,send就会发生(我们可以在回调函数中检查状态)。合流kafka python库正在使用librdkafka库,而此客户端似乎没有正确的文档。某些由kafka协议指定的kafka生产者选项似乎不受librdkafka支持。
下面是我使用的回调示例代码:
另外,检查代理是否存在,您可以只通过telnet连接到代理端口上的代理ip(在本例中是9092)。在kafka集群使用的zookeeper上,您可以检查/brokers/ids下znode的内容