Spring BootKafka消费者

vngu2lb8  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(344)

使用spring集成kafka dsl,我想知道为什么侦听器不接收消息?但是如果我用一个带有kafkalistener注解的方法替换spring集成dsl,同样的应用程序能够很好地使用消息。dsl缺少什么?
不使用的dsl代码:

@Configuration
@EnableKafka
class KafkaConfig {
    //consumer factory provided by Spring boot
    @Bean
    IntegrationFlow inboundKafkaEventFlow(ConsumerFactory consumerFactory) {
        IntegrationFlows
                .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory, "kafkaTopic")
                .configureListenerContainer({ c -> c.groupId('kafka-consumer-staging') })
                .id("kafkaTopicListener").autoStartup(true)
        )

                .channel("logChannel")
                .get()
    }
}

logchannel(或我使用的任何其他通道)不反映入站消息。
与上面的代码不同,如果我使用普通侦听器,那么使用消息就可以了。

@Component
class KafkaConsumer {
    @KafkaListener(topics = ['kafkaTopic'], groupId = 'kafka-consumer-staging')
    void inboundKafkaEvent(String message) {
        log.debug("message is {}", message)
    }
}

这两种方法对kafka使用者使用相同的application.properties。

lymgl2op

lymgl2op1#

您缺少使用spring集成的事实,但是您还没有在应用程序中启用它。不过,你不需要为Kafka这样做,因为你不打算把它和 @KafkaListener . 因此,要启用spring集成基础设施,您需要添加 @EnableIntegration 在你的 @Configuration 班级:https://docs.spring.io/spring-integration/docs/5.1.6.release/reference/html/#configuration-启用集成

相关问题