java—在运行时为消息驱动的通道适配器向kafka使用者主题传递动态值

hmae6n7t  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(546)

要求在运行时设置下面的topics属性,而不重新启动服务器。我们如何在这里实现它。当前,我们正在从属性文件读取值,但这里需要重新启动服务器以反映所做的更改。
示例:sample.properties(在部署目录中)
topic.list=主题1,主题2
并且希望在将来不重新启动服务器的情况下使用topic3。
注意:发现主题是最后一个变量。
尝试从文件系统路径(部署目录外)读取密钥(topic.list),但没有成功。
任何建议。

<int-kafka:message-driven-channel-adapter

               id="inAdapter"
               channel="fromKafka"
               connection-factory="connectionFactory"
               key-decoder="kafkaKeyDecoder"
               payload-decoder="kafkaDecoder"                              
               topics="${topic.list}"
               offset-manager="offsetManager"/>
u7up0aaq

u7up0aaq1#

您可以使用JavaDSL根据需要为其他主题动态添加适配器。。。

@Autowired
private IntegrationFlowContext flowContext;

public void addAnotherListenerForTopics(String... topics) {
    IntegrationFlow flow =
        IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), topics))
            .channel("fromKafka")
            .get();
    this.flowContext.registration(flow).register();
}

bean.addAnotherListenerForTopics("added.new");

聚甲醛:

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-java-dsl</artifactId>
  <version>1.2.1.RELEASE</version>
</dependency>

注意,如果您使用的是代理分区分配,那么新容器需要一个不同的组id,以避免撤销现有的分配。

相关问题