我有一个kafka代理,它运行在云中的某个地方,当我试图通过命令行消费者工具使用它时,我可以使用消息。但是,当我把相同的终点放在我的阿克卡流Kafka消费设置它不工作。
例如:-bin/kafka-console-consumer.sh--zookeeperhostname:2181//yyy --topic name这对我有用。但同样的事情,当我通过消费者设置它不工作
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("hostname:2181/xxx/yyy")
.withGroupId(groupID)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic-name"))
1条答案
按热度按时间inkz8wg91#
如果指定--zookeeper,则使用的是旧的(不安全的)使用者。如果基于云的kafka代理设置正确,您应该能够使用console consumer--bootstrap server param和no--zookeeper进行消费。如果这样做不起作用,那么集群可能没有使用external adverted.listeners ip或主机名正确设置。您可能还希望防火墙关闭对zookeeper的直接客户端访问。