我们有一个使用kafka消息的java应用程序,使用 org.apache.kafka.clients.consumer.KafkaConsumer
我们已经创建了一个带有spring-kafka依赖项的spring引导应用程序,但是无法读取新项目中的消息。我们已经检查了明显的参数,包括引导服务器的主机名和端口(日志显示可以识别)、组、主题以及spring boot与原始使用者一样使用的参数 StringDeserializer
. 这是我们的配置文件:
spring:
kafka:
bootstrap-servers: hostname1:9092,hostname2:9092
consumer:
auto-offset-reset: earliest
group-id: our_group
enable-auto-commit: false
fetch-max-wait: 500
max-poll-records: 1
kafka:
topic:
boot: topic.name
以及接收者:
@Component
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.boot}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
latch.countDown();
}
}
以下是启动启动应用程序的代码:
@SpringBootApplication
public class EmsDemoUsingSpringBootApplication {
public static void main(String[] args) {
SpringApplication.run(EmsDemoUsingSpringBootApplication.class, args);
}
}
正在捕获此异常:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
有什么明显的我忽略了吗?最好的调试方法是什么?
谢谢
2条答案
按热度按时间7dl7o3gd1#
我也有这个问题,发生的是,我无法连接到服务器。您可以在中更改日志级别
application.properties
或者application.yml
查看更多详细信息。恶魔在日志里。。我被告知,Kafka无法处理名称查找和从我的经验,主机连接应该几乎总是fqdn名称(域名和所有)。在我的例子中,我认为我没有在我的虚拟框中设置域,并且不可能找到我的访客框,即使我们在同一个子网中并且
ping
作品。另外,我为Kafka的部分创建了另一个主类,结果证明是错误的。这不是一个好的实践,您应该用
@EnableKafka
把这些设置放在yml文件中,就可以加载了。不需要另一个配置类。我的消费者:
我的申请:
我的姓名:
并启动应用程序。这就是全部。
xam8gpfp2#
我希望你错过了
KafkaListenerContainerFactory
需要在中指定的bean@Configuration
文件也可供消费者指定
KafkaListenerContainerFactory
就像@KafkaListener(topics = ("${kafka.topic.boot}"), containerFactory = "kafkaManualAckListenerContainerFactory"