我在lambda中运行一个Quarkus Kafka生产者。事情是这样的,我想阻塞主线程,直到所有的消息都被产生(和确认),直到我终止lambda执行。我明白了,我通常可以使用microprofile Emitter的CompletionStage<Void> send(T msg);,但是,它只接受有效载荷而不是消息,我需要发送传出Kafka消息的元数据。你能想个办法吗?
CompletionStage<Void> send(T msg);
nx7onnlm1#
您可以向Message添加确认和否定确认函数,在其中可以完成future。然后,可以在发送消息后等待该未来。
Message
@Inject @Channel("example-channel") @OnOverflow(value = OnOverflow.Strategy.THROW_EXCEPTION) Emitter<ObjectToSend> objectEmitter; @Blocking public void emitNewObject(ObjectToSend objectToSend) { try { CompletableFuture<Void> future = new CompletableFuture<>(); Message<ObjectToSend> message = buildMessage(objectToSend, future); objectEmitter.send(message); future.get(); } catch (InterruptedException | ExecutionException e) { log.error("Error sending message", e); throw new RuntimeException(e); } } private Message<ObjectToSend> buildMessage(ObjectToSend objectToSend, CompletableFuture<Void> future) { return Message.of(objectToSend) .withAck(() -> { future.complete(null); return CompletableFuture.completedFuture(null); }).withNack(reason -> { future.completeExceptionally(reason); return CompletableFuture.completedFuture(null); }); }
字符串事实上,您在问题中提到的方法的发射器实现也做了同样的事情。
1条答案
按热度按时间nx7onnlm1#
您可以向
Message
添加确认和否定确认函数,在其中可以完成future。然后,可以在发送消息后等待该未来。字符串
事实上,您在问题中提到的方法的发射器实现也做了同样的事情。