springkafka:如果我在应用程序中定义了一个记录生产者,kafka消费者就不会收到记录

lb3vh1jj  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(289)

我正在用spring和springkafka写一个小poc。我的目标是有一个生产者和消费者,写信给(resp。阅读)从这个主题。
我遇到了一个奇怪的情况:
生产者正确地生成了记录(我可以通过python脚本使用它们)
消费者没有收到记录
但是,如果我从代码中删除生产者并通过另一种方法(例如,使用python脚本)生成记录,那么使用者将正确地接收记录。
下面是我的代码-它与文档的示例非常相似。更准确地说,问题来自这样一个事实:kafkaconsumerconfiguration中的bean不是由spring创建的(即,构建它们的方法从未被调用)。

制作人

kafkaproducerconfiguration.java文件

@Configuration
public class KafkaProducerConfiguration {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
}

messagesender.java文件

@Component
public class MessageSender {
    final static private Logger log = Logger.getLogger(MessageSender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostConstruct
    public void onConstruct() throws InterruptedException {
        log.info("Sending messages...");
        for (int i = 0; i < 100; ++i) {
            kafkaTemplate.send("mytopic", "this is a message");
            Thread.sleep(1000);
        }
        kafkaTemplate.flush(); // NOTE: no changes if I move this call in the loop
        log.info("Done sending messages");
    }
}

消费者

kafkaconsumerconfiguration.java文件

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-service");
        return props;
    }
}

mymessagelistener.java文件

@Service
public class MyMessageListener {
    final static private Logger log = Logger.getLogger(MyMessageListener.class);

    @PostConstruct
    public void onConstruct() {
        log.info("Message listener started");
    }

    @KafkaListener(topics = "mytopic")
    public void onMessageReceived(String message) {
        log.info("Got message: "+ message);
    }
}

以下是应用程序生成的日志,供参考:https://pastebin.com/by783jil. 如您所见,没有创建使用者bean(否则会有一个块) ConsumerConfig values: ... .
以下是我尝试过但没有成功的几件事:
将生产者和使用者配置放在同一配置类中
在kafkaconsumerconfiguration中更改bean的名称(并添加注解属性 containerFactory = "myBeanName"MyMessageListener.onMessageReceived 方法)
更改类的名称 KafkaConsumerConfiguration 去别的地方
添加与Kafka无关的 @Bean 在我的 KafkaConsumerConfiguration 看看它是否会被创建(它会)
版本:spring boot 1.5.9,spring kafka 1.1.7。
我已经把头发拔了好几个小时了,谢谢你的帮助。
谢谢!

aamkag61

aamkag611#

刚发现问题。 MessageSender.onConstruct 实际上需要花费大量的时间来执行(100秒),在此期间它会阻止spring创建其他bean。

cnh2zyt3

cnh2zyt32#

kafkaTemplate.send("mytopic", "this is a message");

您不应该在一个特定的环境中开始与外部服务交互 @PostConstruct 方法-您需要等待应用程序生成后再执行此操作。
实施 SmartLifecyle ,返回 true 为了 isAutoStartup 把代码移到 start() .
或实施 ApplicationListener<ConstextRefreshedEvent> 收到事件后再发送。
任何一种方法都将确保应用程序准备就绪。

相关问题