spring kafka-动态创建流

f5emj3cl  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(508)

我需要从配置文件动态创建kafka流,其中包含源主题名称和每个流的配置。应用程序需要有几十个Kafka流和流将不同的每个环境(例如阶段,产品)。有没有可能这样做 spring-kafka 图书馆?
我们可以很容易的做到 kafka-streams :

@Bean
public List<KafkaStreams> kafkaStreams() {
    return streamRouteProperties.stream()
            .map(routeProperty -> createKafkaStream(routeProperty))
            .collect(toList());
}

private KafkaStreams createKafkaStream(KafkaConfigurationProperties kafkaProperties) {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<Object, String> stream = builder.stream(kafkaProperties.getTopicName());
    Topology topology = builder.build();
    StreamsConfig streamsConfig = new StreamsConfig(kafkaProperties.getSettings());
    return new KafkaStreams(topology, streamsConfig);
}

我们需要实现spring SmartLifecycle 接口,因此所有流将自动启动和关闭。
有没有可能用同样的方法 spring-kafka ? 如我所见,我们需要在代码中创建每个kafka流,我看不到如何使用创建kafka流列表的可能性 StreamsBuilderFactoryBean . 对于每个需要的流,我需要执行以下操作:

@Bean
public KStream<?, ?> kStream(StreamsBuilder streamsBuilder) {
    Consumed<String, String> consumed = ..;
    KStream<String, String> kStream = streamsBuilder.stream(topicName, consumed);
    kStream.process(() -> eventProcessor);
    return kStream;   
}

@Bean
public FactoryBean<StreamsBuilder> streamsBuilder() {
    return new StreamsBuilderFactoryBean(streamsConfig);
}

但是如何使用 StreamsBuilderFactoryBean ?

nwwlzxa7

nwwlzxa71#

这个 StreamsBuilderFactoryBean 只是为spring应用程序上下文带来了一些固执己见的、方便的api,但这并不意味着您应该总是被它束缚住。
幸运的是 StreamsBuilderFactoryBean 比普通的Kafka流没有太多的价值。它最大的作用是对内部创建的 KafkaStreams .
您可以随意使用原始的kafka streams api,不要试图使代码过于复杂化,从而使其成为基于应用程序的需求 StreamsBuilderFactoryBean ,它实际上是为一组静态选项而设计的。

相关问题