我使用的是axon版本(4.3),它无缝地支持springboot主类中带有注解的kafka,使用
@SpringBootApplication(exclude = org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class)
在我的例子中,消息成功地保存在主题中,但是这个消费者的问题是,我无法使用主题中的消息。
你的配置手册?
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.3</version>
<exclusions>
<exclusion>
<groupId>org.axonframework</groupId>
<artifactId>axon-server-connector</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
<version>4.0-RC3</version>
</dependency>
应用程序.yml
axon:
eventhandling:
processors:
conventions:
source: kafkaMessageSource
mode: tracking
serializer:
general: jackson
kafka:
client-id: consumer_service
default-topic: topic_x
bootstrap-servers:
- 127.0.0.1:9092
侦听器.java
//@Component
@ProcessingGroup(value = "conventions")
public class Listener {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericListener.class);
@EventHandler
void on(ConventionCreatedEvent event) {
LOGGER.info("got the event {}", event);
}
}
1条答案
按热度按时间bejyjqdl1#
我认为这是Kafka消息源的名字在你的配置,这是罪魁祸首现在艾门。
当使用axon的auto-configuration和axon-kafka auto-configuration时,如果没有任何关于所需kafka消息源类型的细节,则
StreamableKafkaMessageSource
将创建。那个豆子的名字就是streamableKafkaMessageSource
.在你的
application.yml
然而,你期望source
为了你的conventions
要调用的跟踪事件处理器kafkaMessageSource
.接下来,您可以看看axon的kafka扩展中包含的示例应用程序。也许这会让事情更清楚一点。