kotlin 小黑麦Kafka同步发生器

kqqjbcuj  于 2023-08-06  发布在  Kotlin
关注(0)|答案(1)|浏览(89)

我在lambda中运行一个Quarkus Kafka生产者。事情是这样的,我想阻塞主线程,直到所有的消息都被产生(和确认),直到我终止lambda执行。我明白了,我通常可以使用microprofile Emitter的CompletionStage<Void> send(T msg);,但是,它只接受有效载荷而不是消息,我需要发送传出Kafka消息的元数据。你能想个办法吗?

nx7onnlm

nx7onnlm1#

您可以向Message添加确认和否定确认函数,在其中可以完成future。然后,可以在发送消息后等待该未来。

@Inject
@Channel("example-channel")
@OnOverflow(value = OnOverflow.Strategy.THROW_EXCEPTION)
Emitter<ObjectToSend> objectEmitter;

@Blocking
public void emitNewObject(ObjectToSend objectToSend) {
    try {
        CompletableFuture<Void> future = new CompletableFuture<>();

        Message<ObjectToSend> message = buildMessage(objectToSend, future);
        objectEmitter.send(message);

        future.get();
    } catch (InterruptedException | ExecutionException e) {
        log.error("Error sending message", e);
        throw new RuntimeException(e);
    }
}

private Message<ObjectToSend> buildMessage(ObjectToSend objectToSend, CompletableFuture<Void> future) {
    return Message.of(objectToSend)
        .withAck(() -> {
            future.complete(null);
            return CompletableFuture.completedFuture(null);
        }).withNack(reason -> {
            future.completeExceptionally(reason);
            return CompletableFuture.completedFuture(null);
        });
}

字符串
事实上,您在问题中提到的方法的发射器实现也做了同样的事情。

相关问题