如何重试kstream消息?

kg7wmglp  于 2021-07-15  发布在  Kafka
关注(0)|答案(0)|浏览(270)

我的任务是为kafka kstreams处理器中接收的消息实现重试机制。拓扑结构是这样的(名称/方法被更改以保护无辜者):

KStream<String, GenericRecord> inputStream = kStreamBuilder.stream(inputTopic,Consumed.with(Serdes.String(), getSerDe()));
        return inputStream
                .peek((key,value) -> metricsService.startTimer())
                .map((key, value) -> KeyValue.pair(new MessageKey(key), mapInputMessageToInternalMessage(key, value)))
                .mapValues(tokenizationService::detokenize)
                .mapValues(outputMapper::mapInternalMessageToOutputMessage)
                .peek((key, value) -> loggingUtil.startTransactionLog(value.getInputMessage()))
                .mapValues(someOtherProcessor::writeSomewhereElse)
                .filter(this::filterStream)
                .mapValues(this::toOutputMessageWrapper)
                ;

我意识到我已经离开了很多,但一般来说,我需要能够“重试”这个流程,如果它打破中间的某个地方。例如, detokenize 是外部rest api,可能会遇到暂时错误。如果是这样,我希望能够在指数退避计划上重试,希望它成功。
今天,每一家商店都有支票 mapValues 回调以查看前一步中是否发生了错误,如果是这样,它们基本上就是失败的。在这些情况下 toOutputMessageWrapper 将阻止将消息发送到输出主题。我要做的就是从头再来一遍。
目前我看到的唯一可能的方法(我远不是kafka/kakfastreamsMaven)是在kstream的每个阶段实现重试机制(在不同的阶段) mapValues 方法),而不是试图“重新开始”。
还有其他建议吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题