如何在akka stream kafka consumersettings中指定kafka zookeeper端点

qnyhuwrf  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(389)

我有一个kafka代理,它运行在云中的某个地方,当我试图通过命令行消费者工具使用它时,我可以使用消息。但是,当我把相同的终点放在我的阿克卡流Kafka消费设置它不工作。
例如:-bin/kafka-console-consumer.sh--zookeeperhostname:2181//yyy --topic name这对我有用。但同样的事情,当我通过消费者设置它不工作

val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
  .withBootstrapServers("hostname:2181/xxx/yyy")
  .withGroupId(groupID)
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic-name"))
inkz8wg9

inkz8wg91#

如果指定--zookeeper,则使用的是旧的(不安全的)使用者。如果基于云的kafka代理设置正确,您应该能够使用console consumer--bootstrap server param和no--zookeeper进行消费。如果这样做不起作用,那么集群可能没有使用external adverted.listeners ip或主机名正确设置。您可能还希望防火墙关闭对zookeeper的直接客户端访问。

相关问题