当有多个消费者时,我不能听Kafka主题(我的案例2主题)。在下面的示例中,我有两个消费者工厂,它们将接收两个不同的json消息(一个是用户类型,另一个是事件类型)。两条消息都发布到不同的主题。在这里,当我试图访问来自topic1的事件消息时,我无法访问,但我可以访问用户主题消息。
前任:
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Autowired
private Environment environment;
@Bean
public ConsumerFactory<String,User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String , Event> consumerFactoryEvent(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("event.consumer.group"));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(Event.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryEvent());
return factory;
}
}
我的主要申请如下:
@KafkaListener(topics = "${event.topic}")
public void processEvent(Event event) {
..do something..
..post the message to User topic
}
@KafkaListener(topics = "${user.topic}")
public void processUser(User user) {
..do something..
}
我的需要是先听事件主题,对消息做一些按摩,然后把它发送给用户主题,我有另一种方法,可以听用户主题,并对该消息做一些操作。。我试着给@kafkalistener传递不同的选项,比如
@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent")
但它不起作用。。我不确定出了什么问题。。任何建议都是有用的!
2条答案
按热度按时间hwazgwia1#
在任何文档中都不容易找到。
这里我以消费来自
topic=topic1,bootstrapserver=url1(json序列化程序和反序列化程序)
topic=topic2,bootstrapserver=url2(avro序列化程序和反序列化程序)
第一步:-
第二步:-
@springbootapplication(exclude=kafkaautoconfiguration.class)=>不要从yml或spring.kafka@configurationproperties定义的属性文件中读取值
第三步:-
wpx232ag2#
如果您没有在bean中指定名称,那么方法名称将是bean名称,请在中添加groupid为的bean名称
@KafkaListener
```@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent", groupId="")
@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactory", groupId="")