我在springboot项目中使用了KafkaStreams
。
KafkaStreamFlow定义类:
@Component
public class KafkaStreamFlow {
@PostConstruct
public void flow() {
//...
KStream<String, Data> st1 = st0
.transform(() -> new CustomTransformer(),...);
}
}
字符串
有没有可能不传递Transformer
的新示例,而是传递一个Bean
?
据我所知,topology会为每个kafkastream的task创建Transformer的新Instance。
如果我传递给transform方法
.transform(() -> beanCustomTransformer(),...);
型
会不会出问题?
1条答案
按热度按时间vsnjm48y1#
您应该让拓扑为您创建处理器/Transformer。默认情况下,spring beans是单例的,但是Kafka Streams在您的Transformer创建中没有这样的保证。
根据我的经验,通常最好创建工厂/提供者,让拓扑为您处理这些。如果你仍然想使用spring,考虑在supplier中提供你的bean,比如
Supplier<MyBean>
,这样拓扑就可以做它自己的事情,你仍然可以在configuration classes中或通过annotations来做任何你需要的spring配置。所以你的配置类应该是这样的:
字符串
然后将其注入到TopologyCreator中:
型