我需要从配置文件动态创建kafka流,其中包含源主题名称和每个流的配置。应用程序需要有几十个Kafka流和流将不同的每个环境(例如阶段,产品)。有没有可能这样做 spring-kafka
图书馆?
我们可以很容易的做到 kafka-streams
:
@Bean
public List<KafkaStreams> kafkaStreams() {
return streamRouteProperties.stream()
.map(routeProperty -> createKafkaStream(routeProperty))
.collect(toList());
}
private KafkaStreams createKafkaStream(KafkaConfigurationProperties kafkaProperties) {
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, String> stream = builder.stream(kafkaProperties.getTopicName());
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(kafkaProperties.getSettings());
return new KafkaStreams(topology, streamsConfig);
}
我们需要实现spring SmartLifecycle
接口,因此所有流将自动启动和关闭。
有没有可能用同样的方法 spring-kafka
? 如我所见,我们需要在代码中创建每个kafka流,我看不到如何使用创建kafka流列表的可能性 StreamsBuilderFactoryBean
. 对于每个需要的流,我需要执行以下操作:
@Bean
public KStream<?, ?> kStream(StreamsBuilder streamsBuilder) {
Consumed<String, String> consumed = ..;
KStream<String, String> kStream = streamsBuilder.stream(topicName, consumed);
kStream.process(() -> eventProcessor);
return kStream;
}
@Bean
public FactoryBean<StreamsBuilder> streamsBuilder() {
return new StreamsBuilderFactoryBean(streamsConfig);
}
但是如何使用 StreamsBuilderFactoryBean
?
1条答案
按热度按时间nwwlzxa71#
这个
StreamsBuilderFactoryBean
只是为spring应用程序上下文带来了一些固执己见的、方便的api,但这并不意味着您应该总是被它束缚住。幸运的是
StreamsBuilderFactoryBean
比普通的Kafka流没有太多的价值。它最大的作用是对内部创建的KafkaStreams
.您可以随意使用原始的kafka streams api,不要试图使代码过于复杂化,从而使其成为基于应用程序的需求
StreamsBuilderFactoryBean
,它实际上是为一组静态选项而设计的。