Spring Boot KafkaStream将Bean作为Transformer传递

avwztpqn  于 2023-08-04  发布在  Spring
关注(0)|答案(1)|浏览(103)

我在springboot项目中使用了KafkaStreams
KafkaStreamFlow定义类:

@Component
public class KafkaStreamFlow {
   @PostConstruct    
   public void flow() {

       //...

       KStream<String, Data> st1 = st0
                    .transform(() -> new CustomTransformer(),...);

   }
}

字符串
有没有可能不传递Transformer的新示例,而是传递一个Bean
据我所知,topology会为每个kafkastream的task创建Transformer的新Instance
如果我传递给transform方法

.transform(() -> beanCustomTransformer(),...);


会不会出问题?

vsnjm48y

vsnjm48y1#

您应该让拓扑为您创建处理器/Transformer。默认情况下,spring beans是单例的,但是Kafka Streams在您的Transformer创建中没有这样的保证。
根据我的经验,通常最好创建工厂/提供者,让拓扑为您处理这些。如果你仍然想使用spring,考虑在supplier中提供你的bean,比如Supplier<MyBean>,这样拓扑就可以做它自己的事情,你仍然可以在configuration classes中或通过annotations来做任何你需要的spring配置。
所以你的配置类应该是这样的:

@Configuration
public class MyConfigClass {
    @Bean
    public Supplier<CustomTransformer> myBeanSupplier() {
        return () -> new CustomTransformer();
    }
}

字符串
然后将其注入到TopologyCreator中:

@Component
class MyTopology {
    private Supplier<CustomTransformer> supplier;
    
    // constructor injection for supplier

     @PostConstruct    
   public void flow() {

       //...

       KStream<String, Data> st1 = st0
                    .transform(supplier,...);

   }
}

相关问题