我正在开发一个spring引导应用程序,它使用以kafka主题为源的spring集成流。我们的集成流程从一个包含subscribablechannels和springframework.cloud.stream.annotation.input和output注解的接口开始。这些被配置为通过带有spring.cloud.stream.kafka.bindings的cloud config从kafka读取。
当应用程序第一次启动时,它立即开始阅读Kafka主题。这是一个问题,因为应用程序需要初始化一些本地的、不可持久的数据库,然后才能开始正确处理传入的kafka消息。
我们目前正在使用@postconstruct在kafka启动之前填充这些内存数据库,但这是次优的,因为应用程序无法使用eureka、feign等可靠地找到具有内存数据库最新数据的健康服务。
由于各种原因,无法更改体系结构,以便共享或预填充内存中的数据库。要知道,当我把它称为内存数据库时,我把事情简化了一点,它实际上是另一种服务。
启动spring启动应用程序的最佳方式是什么?这样,从kafka读取的集成流以暂停状态启动,并且在其他进程完成后可以取消暂停?
1条答案
按热度按时间czfnxgou1#
我想你用
KafkaMessageDrivenChannelAdapter
根据你提到的spring集成java dsl-Kafka.messageDrivenChannelAdapter()
准确地说。可以用id
以及autoStartup(false)
. 因此,它不会立即开始消费Kafka的主题。无论何时你准备好消费,你都可以start()
此组件将其作为Lifecycle
从应用程序上下文使用提到的id。或者您可以向控制总线发送适当的消息。
更新
如果您处理springcloudstream和kafka活页夹,您应该考虑注入一个
BindingsEndpoint
bean并执行其changeState(@Selector String name, State state)
你的绑定名和State.STOPPED
. 当内存数据库准备就绪时,用State.STARTED
: https://docs.spring.io/spring-cloud-stream/docs/elmhurst.release/reference/htmlsingle/#_binding_visualization_and_control