我是kafka0.9的新手,在测试一些特性时,我意识到java实现的消费者中有一种奇怪的行为( KafkaConsumer
).
Kafka代理位于ambari外部机器中。
即使我可以实现一个生产者并开始向外部代理发送消息,我也不知道为什么当消费者试图读取事件(poll)时,它会卡住。
我知道producer工作得很好,因为我确实可以通过console consumer(在ambari上本地工作)使用消息。但是当我执行java消费者时,什么也没发生,只是卡住了。调试代码时,我可以看到它在 poll()
生产线:
ConsumerRecords<String, String> records = consumer.poll(100);
顺便说一句,超时没有任何作用。不管您输入0、100或1000毫秒,消费程序都会被阻塞在这一行中,并且不会超时或抛出异常。
我尝试了所有类型的可选属性,例如advised.host.name、advised.listener,。。。以此类推,零运气。
任何帮助都将不胜感激。提前谢谢!
2条答案
按热度按时间ac1kyiln1#
原因可能是运行用户代码的计算机无法连接到zookeeper。试着在安装了kafka的机器上运行相同的用户代码(我试过这个并为我工作)。我还通过在server.properties文件中提到以下属性来解决问题:
advertised.host.name="ip address which you want to expose"
//在我的情况下,这是公共ip的ec2机器,我有Kafka和zookeeper安装在同一个ec2。advertised.port=9092
ConsumerRecords<String, String> records = consumer.poll(100);
上述声明并不意味着消费者将在100毫秒后超时,这是投票期。它在100毫秒内捕获的任何数据都被读入记录集合。zpjtge222#
在我的例子中,poll()方法最终陷入了无限循环ensurecoordinatorready(),协调器这个词告诉我协调器运行在另一个主机上(出于测试目的,我只在我的/etc/hosts中添加了一个代理主机,而总共有三个代理主机)。所以消费者正确地得到了消费者协调器。
因此解决方案是:在/etc/hosts文件中正确配置运行kafka代理的主机