我正在Java中实现Sping Boot 应用程序,使用Spring Cloud Stream和Kafka Streams绑定器。
我需要在KStreamMap方法中实现阻塞操作,如下所示:
public Consumer<KStream<?, ?>> sink() {
return input -> input
.mapValues(value -> methodReturningCompletableFuture(value).get())
.foreach((key, value) -> otherMethod(key, value));
}
completableFuture.get()
抛出异常(中断异常、执行异常)
如何处理这些异常,使链接的方法不被执行,Kafka消息不被确认?我不能承受消息丢失,将其发送到一个死信主题不是一个选项。
有没有更好的方法阻止map()
内部?
1条答案
按热度按时间hmmo2u0o1#
您可以尝试Kafka Streams中的分支功能来控制链接方法的执行。例如,下面是一个您可以尝试的伪代码。您可以将其用作起点,并根据您的特定用例进行调整。
这里的想法是只把没有抛出异常的记录发送到命名分支
good-records
,其他的都进入默认分支,我们在伪代码中忽略它,然后只为那些“好的”记录调用额外的链式方法(如foreach
调用所示)。这并不能解决在抛出异常后不确认消息的问题。这似乎有点挑战性。然而,我对那个用例很好奇。当异常发生并且你处理它时,为什么你不想确认消息?如果不使用DLT,要求似乎有点严格。这里理想的解决方案是,您可能希望引入一些重试操作,一旦重试操作用尽,就将记录发送到DLT,使Kafka Streams使用者确认消息,然后应用程序移动到下一个偏移量。
调用
methodReturningCompletableFuture(value).get()
只是等待,直到达到默认的或配置的超时,假设methodReturningCompletableFuture()
返回一个Future
对象。因此,这已经是一个很好的方法来等待在KStream
Map操作。我不认为有任何其他必要使它等待更长的时间。