java 如何等待未来在Kafka流图()?

pw9qyyiw  于 2023-01-01  发布在  Java
关注(0)|答案(1)|浏览(141)

我正在Java中实现Sping Boot 应用程序,使用Spring Cloud Stream和Kafka Streams绑定器。
我需要在KStreamMap方法中实现阻塞操作,如下所示:

public Consumer<KStream<?, ?>> sink() {
    return input -> input
        .mapValues(value -> methodReturningCompletableFuture(value).get())
        .foreach((key, value) -> otherMethod(key, value));
}

completableFuture.get()抛出异常(中断异常、执行异常)
如何处理这些异常,使链接的方法不被执行,Kafka消息不被确认?我不能承受消息丢失,将其发送到一个死信主题不是一个选项。
有没有更好的方法阻止map()内部?

hmmo2u0o

hmmo2u0o1#

您可以尝试Kafka Streams中的分支功能来控制链接方法的执行。例如,下面是一个您可以尝试的伪代码。您可以将其用作起点,并根据您的特定用例进行调整。

final Map<String, ? extends KStream<?, String>> branches = 
input.split()
     .branch(k, v) -> {
        try {
          methodReturningCompletableFuture(value).get();
          return true;
        }
        catch (Exception e) {
          return false;
        }
      }, Branched.as("good-records"))
      .defaultBranch();

final KStream<?, String> kStream = branches.get("good-records");

 kStream.foreach((key, value) -> otherMethod(key, value));

这里的想法是只把没有抛出异常的记录发送到命名分支good-records,其他的都进入默认分支,我们在伪代码中忽略它,然后只为那些“好的”记录调用额外的链式方法(如foreach调用所示)。
这并不能解决在抛出异常后不确认消息的问题。这似乎有点挑战性。然而,我对那个用例很好奇。当异常发生并且你处理它时,为什么你不想确认消息?如果不使用DLT,要求似乎有点严格。这里理想的解决方案是,您可能希望引入一些重试操作,一旦重试操作用尽,就将记录发送到DLT,使Kafka Streams使用者确认消息,然后应用程序移动到下一个偏移量。
调用methodReturningCompletableFuture(value).get()只是等待,直到达到默认的或配置的超时,假设methodReturningCompletableFuture()返回一个Future对象。因此,这已经是一个很好的方法来等待在KStreamMap操作。我不认为有任何其他必要使它等待更长的时间。

相关问题