我有一个网络应用程序,我在其中使用Kinesis Stream与spring-cloud-stream,spring-cloud-stream-binder-kinesis和spring-cloud-starter-sleuth。我想做的是正确地配置错误处理。我想:
1.处理消息流
1.如果点1中出现错误,处理错误(特别是针对每个流)
1.创建一个errorChannel,全局处理第2点中发生的所有错误。
属性:
spring:
cloud:
stream:
function:
definition: destination1
bindings:
destination1-in-0:
destination: destination1
group: group1
下面是配置代码(在一个带@Configuration注解的类中):
@Bean
public Consumer<MyStream> destination1() {
return message -> {
if (true) throw new RuntimeException("test");
LOGGER.info("message: " + message);
};
}
@ServiceActivator(inputChannel = "destination1.group1.errors")
public void errorHandle(Message<?> message) {
// TODO
LOGGER.error("Handling ERROR: " + message);
throw new RuntimeException("test2");
}
@Bean
public PublishSubscribeChannel errorChannel() {
ErrorHandler errorHandler = throwable -> LOGGER.error("Handling ERROR in errorChannel error handler: ");
Executor executor = new ConcurrentTaskExecutor();
PublishSubscribeChannel publishSubscribeChannel = new PublishSubscribeChannel(executor);
publishSubscribeChannel.setErrorHandler(errorHandler);
return publishSubscribeChannel;
}
请注意,步骤1和步骤2正在工作。它缺少的是第三步。在日志“处理错误:.”我看到另一个信息日志:
a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[142], headers={aws_shard=shardId-000000000000, id=bb56042b-240f-43b7-4c69-90de463bfeca, sourceData={SequenceNumber: 49645044161965514128638937043897685293310641448383676418,ApproximateArrivalTimestamp: Fri Oct 06 12:41:37 CEST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=142 cap=142],PartitionKey: 2007189807,}, aws_receivedPartitionKey=2007189807, aws_receivedStream=mpeNotificationFraud, aws_receivedSequenceNumber=49645044161965514128638937043897685293310641448383676418, timestamp=1696588898426}]'
for the '{SequenceNumber: 49645044161965514128638937043897685293310641448383676418,ApproximateArrivalTimestamp: Fri Oct 06 12:41:37 CEST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=142 cap=142],PartitionKey: 2007189807,}'.
Consider to use 'errorChannel' flow for the compensation logic.
然后有这个例外(由以下原因引起:java.lang.RuntimeException:测试2):
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@d13016c]; nested exception is java.lang.RuntimeException: test2
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.19.jar:5.5.19]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:113) ~[spring-integration-core-5.5.19.jar:5.5.19]
我想要的是我配置的errorChannel开始工作,所以对于这个简单的例子,它记录了“Handling ERROR in errorChannel error handler”。
1条答案
按热度按时间xbp102n01#
请参阅文档从现在开始必须如何处理错误:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_handle_error_messages