我正在用spring和springkafka写一个小poc。我的目标是有一个生产者和消费者,写信给(resp。阅读)从这个主题。
我遇到了一个奇怪的情况:
生产者正确地生成了记录(我可以通过python脚本使用它们)
消费者没有收到记录
但是,如果我从代码中删除生产者并通过另一种方法(例如,使用python脚本)生成记录,那么使用者将正确地接收记录。
下面是我的代码-它与文档的示例非常相似。更准确地说,问题来自这样一个事实:kafkaconsumerconfiguration中的bean不是由spring创建的(即,构建它们的方法从未被调用)。
制作人
kafkaproducerconfiguration.java文件
@Configuration
public class KafkaProducerConfiguration {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}
messagesender.java文件
@Component
public class MessageSender {
final static private Logger log = Logger.getLogger(MessageSender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostConstruct
public void onConstruct() throws InterruptedException {
log.info("Sending messages...");
for (int i = 0; i < 100; ++i) {
kafkaTemplate.send("mytopic", "this is a message");
Thread.sleep(1000);
}
kafkaTemplate.flush(); // NOTE: no changes if I move this call in the loop
log.info("Done sending messages");
}
}
消费者
kafkaconsumerconfiguration.java文件
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-service");
return props;
}
}
mymessagelistener.java文件
@Service
public class MyMessageListener {
final static private Logger log = Logger.getLogger(MyMessageListener.class);
@PostConstruct
public void onConstruct() {
log.info("Message listener started");
}
@KafkaListener(topics = "mytopic")
public void onMessageReceived(String message) {
log.info("Got message: "+ message);
}
}
以下是应用程序生成的日志,供参考:https://pastebin.com/by783jil. 如您所见,没有创建使用者bean(否则会有一个块) ConsumerConfig values: ...
.
以下是我尝试过但没有成功的几件事:
将生产者和使用者配置放在同一配置类中
在kafkaconsumerconfiguration中更改bean的名称(并添加注解属性 containerFactory = "myBeanName"
上 MyMessageListener.onMessageReceived
方法)
更改类的名称 KafkaConsumerConfiguration
去别的地方
添加与Kafka无关的 @Bean
在我的 KafkaConsumerConfiguration
看看它是否会被创建(它会)
版本:spring boot 1.5.9,spring kafka 1.1.7。
我已经把头发拔了好几个小时了,谢谢你的帮助。
谢谢!
2条答案
按热度按时间aamkag611#
刚发现问题。
MessageSender.onConstruct
实际上需要花费大量的时间来执行(100秒),在此期间它会阻止spring创建其他bean。cnh2zyt32#
您不应该在一个特定的环境中开始与外部服务交互
@PostConstruct
方法-您需要等待应用程序生成后再执行此操作。实施
SmartLifecyle
,返回true
为了isAutoStartup
把代码移到start()
.或实施
ApplicationListener<ConstextRefreshedEvent>
收到事件后再发送。任何一种方法都将确保应用程序准备就绪。