无法在spring boot中使用kafka消息

bakd9h0s  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(421)

我们有一个使用kafka消息的java应用程序,使用 org.apache.kafka.clients.consumer.KafkaConsumer 我们已经创建了一个带有spring-kafka依赖项的spring引导应用程序,但是无法读取新项目中的消息。我们已经检查了明显的参数,包括引导服务器的主机名和端口(日志显示可以识别)、组、主题以及spring boot与原始使用者一样使用的参数 StringDeserializer . 这是我们的配置文件:

spring:
  kafka:
    bootstrap-servers: hostname1:9092,hostname2:9092
    consumer:
      auto-offset-reset: earliest
      group-id: our_group
      enable-auto-commit: false
      fetch-max-wait: 500
      max-poll-records: 1

kafka:
  topic:
    boot: topic.name

以及接收者:

@Component
public class Receiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {
        return latch;
    }

    @KafkaListener(topics = "${kafka.topic.boot}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        latch.countDown();
    }

}

以下是启动启动应用程序的代码:

@SpringBootApplication
public class EmsDemoUsingSpringBootApplication {

    public static void main(String[] args) {
        SpringApplication.run(EmsDemoUsingSpringBootApplication.class, args);
    }
}

正在捕获此异常:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

有什么明显的我忽略了吗?最好的调试方法是什么?
谢谢

7dl7o3gd

7dl7o3gd1#

我也有这个问题,发生的是,我无法连接到服务器。您可以在中更改日志级别 application.properties 或者 application.yml 查看更多详细信息。恶魔在日志里。。

logging:
  level:
    root: WARN
    org.springframework: INFO
    org.apache.kafka: DEBUG

我被告知,Kafka无法处理名称查找和从我的经验,主机连接应该几乎总是fqdn名称(域名和所有)。在我的例子中,我认为我没有在我的虚拟框中设置域,并且不可能找到我的访客框,即使我们在同一个子网中并且 ping 作品。
另外,我为Kafka的部分创建了另一个主类,结果证明是错误的。这不是一个好的实践,您应该用 @EnableKafka 把这些设置放在yml文件中,就可以加载了。不需要另一个配置类。
我的消费者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics={"testtopic"})
    public void listen(@Payload String message) {
        log.info("Received message is {}", message);
    }
}

我的申请:

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.kafka.annotation.EnableKafka;

@Slf4j
@SpringBootApplication(exclude = { SecurityAutoConfiguration.class })
@EnableKafka    // <----- I only had to add this line
public class SomeApplication {

    public static void main(String[] args) {
        SpringApplication.run(SomeApplication.class, args);
        log.info("Application launched. ");
    }
}

我的姓名:

logging:
  level:
    root: WARN
    org.springframework: INFO
    org.apache.kafka: DEBUG

spring:
  kafka:
    bootstrap-servers: <FQDN names here:9092>
    consumer:
      group-id: <unique-group-id>
      enable-auto-commit: false # never ack messsage when it is received.
    listener:
      ack-mode: manual # I am responsible to ack the messages

并启动应用程序。这就是全部。

xam8gpfp

xam8gpfp2#

我希望你错过了 KafkaListenerContainerFactory 需要在中指定的bean @Configuration 文件

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaManualAckListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(new HashMap<String,Object>((Map)consumerConfig)));
  factory.setConcurrency(concurrentConsumerCount);
  factory.setBatchListener(true);
  return factory;
}

也可供消费者指定 KafkaListenerContainerFactory 就像 @KafkaListener(topics = ("${kafka.topic.boot}"), containerFactory = "kafkaManualAckListenerContainerFactory"

相关问题