我正在使用redis streams和spring数据redis 2.2.4。我想弄清楚的一件事是什么是正确的错误处理方法。
对于无React模型,我们将errorhandler和EANCELSubscriptionError设置为 StreamReadRequest
. 然后我们可以控制应该取消流上订阅的异常类型。
例子:
StreamReadRequest<String> streamReadRequest = StreamReadRequest.builder(offset)
.errorHandler(streamListener.getErrorHandler())
.cancelOnError(e -> false)
.consumer(consumer)
.autoAck(true)
.build();
但是对于React模型,我没有发现任何处理API的错误 StreamReceiver
. 当业务消息处理程序抛出异常时。redis流上的订阅将被取消,无法再接收消息。因此,我必须确保我的业务逻辑代码中没有异常。请参见下面的示例:
public void init() {
StreamOffset<String> offset = StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed());
receiver.receiveAutoAck(consumer, offset)
.flatMap(this::onMessage)
.subscribe();
}
private Mono<Long> onMessage(ObjectRecord<String, MyEvent> message) {
try {
return redisTemplate.opsForStream().acknowledge(groupName, message);
} catch (Exception e) {
return Mono.just(-1L);
}
}
暂无答案!
目前还没有任何答案,快来回答吧!