如何在spring cloud(spring integration)中正确配置errorChannel?

niwlg2el  于 2023-10-15  发布在  Spring
关注(0)|答案(1)|浏览(122)

我有一个网络应用程序,我在其中使用Kinesis Stream与spring-cloud-stream,spring-cloud-stream-binder-kinesis和spring-cloud-starter-sleuth。我想做的是正确地配置错误处理。我想:
1.处理消息流
1.如果点1中出现错误,处理错误(特别是针对每个流)
1.创建一个errorChannel,全局处理第2点中发生的所有错误。
属性:

spring:
  cloud:
    stream:
      function:
        definition: destination1
      bindings:
        destination1-in-0:
          destination: destination1
          group: group1

下面是配置代码(在一个带@Configuration注解的类中):

@Bean
    public Consumer<MyStream> destination1() {
        return message -> {
            if (true) throw new RuntimeException("test");
            LOGGER.info("message: " + message);
        };
    }

    @ServiceActivator(inputChannel = "destination1.group1.errors")
    public void errorHandle(Message<?> message) {
        // TODO
        LOGGER.error("Handling ERROR: " + message);
        throw new RuntimeException("test2");
    }

    @Bean
    public PublishSubscribeChannel errorChannel() {
        ErrorHandler errorHandler = throwable -> LOGGER.error("Handling ERROR in errorChannel error handler: ");
        Executor executor = new ConcurrentTaskExecutor();
        PublishSubscribeChannel publishSubscribeChannel = new PublishSubscribeChannel(executor);
        publishSubscribeChannel.setErrorHandler(errorHandler);
        return publishSubscribeChannel;
    }

请注意,步骤1和步骤2正在工作。它缺少的是第三步。在日志“处理错误:.”我看到另一个信息日志:

a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[142], headers={aws_shard=shardId-000000000000, id=bb56042b-240f-43b7-4c69-90de463bfeca, sourceData={SequenceNumber: 49645044161965514128638937043897685293310641448383676418,ApproximateArrivalTimestamp: Fri Oct 06 12:41:37 CEST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=142 cap=142],PartitionKey: 2007189807,}, aws_receivedPartitionKey=2007189807, aws_receivedStream=mpeNotificationFraud, aws_receivedSequenceNumber=49645044161965514128638937043897685293310641448383676418, timestamp=1696588898426}]'
for the '{SequenceNumber: 49645044161965514128638937043897685293310641448383676418,ApproximateArrivalTimestamp: Fri Oct 06 12:41:37 CEST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=142 cap=142],PartitionKey: 2007189807,}'.
Consider to use 'errorChannel' flow for the compensation logic.

然后有这个例外(由以下原因引起:java.lang.RuntimeException:测试2):

org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@d13016c]; nested exception is java.lang.RuntimeException: test2
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.19.jar:5.5.19]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:113) ~[spring-integration-core-5.5.19.jar:5.5.19]

我想要的是我配置的errorChannel开始工作,所以对于这个简单的例子,它记录了“Handling ERROR in errorChannel error handler”。

xbp102n0

xbp102n01#

请参阅文档从现在开始必须如何处理错误:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_handle_error_messages

@Bean
public Consumer<ErrorMessage> myErrorHandler() {

spring.cloud.stream.bindings.destination1-in-0.error-handler-definition=myErrorHandler

相关问题