使用spring集成kafka dsl,我想知道为什么侦听器不接收消息?但是如果我用一个带有kafkalistener注解的方法替换spring集成dsl,同样的应用程序能够很好地使用消息。dsl缺少什么?
不使用的dsl代码:
@Configuration
@EnableKafka
class KafkaConfig {
//consumer factory provided by Spring boot
@Bean
IntegrationFlow inboundKafkaEventFlow(ConsumerFactory consumerFactory) {
IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory, "kafkaTopic")
.configureListenerContainer({ c -> c.groupId('kafka-consumer-staging') })
.id("kafkaTopicListener").autoStartup(true)
)
.channel("logChannel")
.get()
}
}
logchannel(或我使用的任何其他通道)不反映入站消息。
与上面的代码不同,如果我使用普通侦听器,那么使用消息就可以了。
@Component
class KafkaConsumer {
@KafkaListener(topics = ['kafkaTopic'], groupId = 'kafka-consumer-staging')
void inboundKafkaEvent(String message) {
log.debug("message is {}", message)
}
}
这两种方法对kafka使用者使用相同的application.properties。
1条答案
按热度按时间lymgl2op1#
您缺少使用spring集成的事实,但是您还没有在应用程序中启用它。不过,你不需要为Kafka这样做,因为你不打算把它和
@KafkaListener
. 因此,要启用spring集成基础设施,您需要添加@EnableIntegration
在你的@Configuration
班级:https://docs.spring.io/spring-integration/docs/5.1.6.release/reference/html/#configuration-启用集成