spring云流应用程序在供应商完成之前退出

plupiseo  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(441)

我有一个“任务”应用程序,它的寿命很短,根据数据库中的状态向Kafka生成消息。我正在使用springcloudstream生成消息,使用下面的应用程序格式。我遵循springcloudstream文档中的这种格式将任意数据发送到输出绑定。

private EmitterProcessor<Message<GenericRecord>> processor;
@Override
public void run(ApplicationArguments arg0) {
    // ... create Message<GenericRecord> producerRecord
    this.processor.onNext(producerRecord);
}
@Bean
public Supplier<Flux<Message<GenericRecord>>> supplier() {
    return () -> this.processor;
}
public static void main(String[] args) {
    ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);
    ctx.close();
}

应用程序运行,创建记录,运行onnext(),然后退出。然后我查看是否有任何消息已发布,但没有关于该主题的消息。然后我添加了一个 Thread.sleep(10000) 在生成每条消息并以主题结束之后。
在查看了React堆的文档之后,我并没有看到任何清晰的方法来实现这一点。有没有办法在spring应用程序退出之前等待emitterprocessor完成消息的发布?

fruv7luv

fruv7luv1#

你有没有具体的理由使用 EmittorProcessor ? 我认为这个用例可以通过使用 StreamBridge . 例如。

@Autowired StreamBridge streamBridge;

@Override
public void run(ApplicationArguments arg0) {
    // ... create Message<GenericRecord> producerRecord
    this.streamBridge.send("process-out-0", producerRecord);
}

然后提供以下配置: spring.cloud.stream.source: process 你可以找到更多关于 StreamBridge 在参考文件中。

相关问题