所以我的问题是,我在属性文件中定义了一些Kafka主题,我可以阅读 KafkaStream<String, String>
在我的springboot应用程序中没有问题。但我想进入 KafkaStreams
对象,以便我可以打印 KafkaStreams
有助于开发的拓扑结构。在我的一个房间里 @StreamListener
我尝试检索stream builder进程bean,以便获得底层 KafkaStreams
以这种方式对象(如下所述:https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_accessing_the_underlying_kafkastreams_object)但不幸的是,它没有起作用。代码如下:
@StreamListener
public void processEvent(@Input("order-paid-stream") KStream<String, String> inputStream) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = applicationContext.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
System.out.println(kafkaStreams.toString());
inputStream.foreach(this::handleMessage);
}
应用程序启动后,我收到以下消息:
我也试着找回 KafkaStreams
在应用程序启动后,以相同的方式在我的一个rest控制器方法上创建一个对象,并出现类似的错误(找不到该名称的bean)。
有什么帮助吗?
2条答案
按热度按时间hyrbngr71#
从一个延迟的独立线程开始,以确保创建kafkastream对象并完成拓扑。
streamsbuilderfactorybean.getkafkastreams()返回kafkastream对象streamsbuilderfactorybean.getsingletoninstance()。topology返回拓扑对象streamsbuilderfactorybean.getstreamsconfiguration()返回所有设置的kafka流配置
hxzsmxv22#
我遇到了一个类似的问题,我发现我还需要包含进程函数在其中定义的类名。
stream-builder-MyStreamProcessor-process
.