我有一个“任务”应用程序,它的寿命很短,根据数据库中的状态向Kafka生成消息。我正在使用springcloudstream生成消息,使用下面的应用程序格式。我遵循springcloudstream文档中的这种格式将任意数据发送到输出绑定。
private EmitterProcessor<Message<GenericRecord>> processor;
@Override
public void run(ApplicationArguments arg0) {
// ... create Message<GenericRecord> producerRecord
this.processor.onNext(producerRecord);
}
@Bean
public Supplier<Flux<Message<GenericRecord>>> supplier() {
return () -> this.processor;
}
public static void main(String[] args) {
ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);
ctx.close();
}
应用程序运行,创建记录,运行onnext(),然后退出。然后我查看是否有任何消息已发布,但没有关于该主题的消息。然后我添加了一个 Thread.sleep(10000)
在生成每条消息并以主题结束之后。
在查看了React堆的文档之后,我并没有看到任何清晰的方法来实现这一点。有没有办法在spring应用程序退出之前等待emitterprocessor完成消息的发布?
1条答案
按热度按时间fruv7luv1#
你有没有具体的理由使用
EmittorProcessor
? 我认为这个用例可以通过使用StreamBridge
. 例如。然后提供以下配置:
spring.cloud.stream.source: process
你可以找到更多关于StreamBridge
在参考文件中。