start-spring-boot应用程序与spring集成kafka

ix0qys7i  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(380)

我正在开发一个spring引导应用程序,它使用以kafka主题为源的spring集成流。我们的集成流程从一个包含subscribablechannels和springframework.cloud.stream.annotation.input和output注解的接口开始。这些被配置为通过带有spring.cloud.stream.kafka.bindings的cloud config从kafka读取。
当应用程序第一次启动时,它立即开始阅读Kafka主题。这是一个问题,因为应用程序需要初始化一些本地的、不可持久的数据库,然后才能开始正确处理传入的kafka消息。
我们目前正在使用@postconstruct在kafka启动之前填充这些内存数据库,但这是次优的,因为应用程序无法使用eureka、feign等可靠地找到具有内存数据库最新数据的健康服务。
由于各种原因,无法更改体系结构,以便共享或预填充内存中的数据库。要知道,当我把它称为内存数据库时,我把事情简化了一点,它实际上是另一种服务。
启动spring启动应用程序的最佳方式是什么?这样,从kafka读取的集成流以暂停状态启动,并且在其他进程完成后可以取消暂停?

czfnxgou

czfnxgou1#

我想你用 KafkaMessageDrivenChannelAdapter 根据你提到的spring集成java dsl- Kafka.messageDrivenChannelAdapter() 准确地说。可以用 id 以及 autoStartup(false) . 因此,它不会立即开始消费Kafka的主题。无论何时你准备好消费,你都可以 start() 此组件将其作为 Lifecycle 从应用程序上下文使用提到的id。
或者您可以向控制总线发送适当的消息。
更新
如果您处理springcloudstream和kafka活页夹,您应该考虑注入一个 BindingsEndpoint bean并执行其 changeState(@Selector String name, State state) 你的绑定名和 State.STOPPED . 当内存数据库准备就绪时,用 State.STARTED : https://docs.spring.io/spring-cloud-stream/docs/elmhurst.release/reference/htmlsingle/#_binding_visualization_and_control

相关问题