无法在使用SpringCloud流绑定器的SpringBoot应用程序中检索kafkastreams对象

lmyy7pcs  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(329)

所以我的问题是,我在属性文件中定义了一些Kafka主题,我可以阅读 KafkaStream<String, String> 在我的springboot应用程序中没有问题。但我想进入 KafkaStreams 对象,以便我可以打印 KafkaStreams 有助于开发的拓扑结构。在我的一个房间里 @StreamListener 我尝试检索stream builder进程bean,以便获得底层 KafkaStreams 以这种方式对象(如下所述:https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_accessing_the_underlying_kafkastreams_object)但不幸的是,它没有起作用。代码如下:

@StreamListener
public void processEvent(@Input("order-paid-stream") KStream<String, String> inputStream) {
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = applicationContext.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
    KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
    System.out.println(kafkaStreams.toString());
    inputStream.foreach(this::handleMessage);
}

应用程序启动后,我收到以下消息:

我也试着找回 KafkaStreams 在应用程序启动后,以相同的方式在我的一个rest控制器方法上创建一个对象,并出现类似的错误(找不到该名称的bean)。
有什么帮助吗?

hyrbngr7

hyrbngr71#

从一个延迟的独立线程开始,以确保创建kafkastream对象并完成拓扑。
streamsbuilderfactorybean.getkafkastreams()返回kafkastream对象streamsbuilderfactorybean.getsingletoninstance()。topology返回拓扑对象streamsbuilderfactorybean.getstreamsconfiguration()返回所有设置的kafka流配置

class StreamsListener {
    @StreamListener
    @SendTo("output")
    public KStream<String, String> process(@Input("input') KStream<String,String> rawCloudEventKStream {

        new Thread(() -> {
            try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); }

            StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-StreamsListener-process", StreamsBuilderFactoryBean.class);
            System.out.println("KafkaStreams configs: " + streamsBuilderFactoryBean.getStreamsConfiguration());
            KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
        }).start();

        ...
    }
}
hxzsmxv2

hxzsmxv22#

我遇到了一个类似的问题,我发现我还需要包含进程函数在其中定义的类名。 stream-builder-MyStreamProcessor-process .

相关问题