使用服务激活器配置断路器

q1qsirdb  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(366)

配置详细信息:

<int:publish-subscribe-channel id="toKafka"/>

  <int:publish-subscribe-channel id="sendMessageToKafkaChannel"/>

  <int:service-activator input-channel="toKafka" output-channel="sendMessageToKafkaChannel" order="1" ref="conditionalProducerService" method="producerCircuitBreaker">
       <int:request-handler-advice-chain> 
              <ref bean="circuitBreakerAdvice" />
       </int:request-handler-advice-chain>
  </int:service-activator>

  <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext"
                                                                       auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/>

  <bean id="circuitBreakerAdvice" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
    <property name="threshold" value="2"/>                      
    <property name="halfOpenAfter" value="15000" />                     
  </bean>
public Message<?> producerCircuitBreaker(Message<?> payload) {
      throw  new RuntimeException("foo Pro");
  }
for(int i=0;i<4;i++){
          toKafka.send(MessageBuilder
                              .withPayload(messageVO.getMessageContentVO())                                  
                              .setHeader(KafkaHeaders.TOPIC, topic)
                              .setHeader(KafkaHeaders.PARTITION_ID,Integer.parseInt(messageVO.getPartition())).
                              build());

                 APPLOGGER.info("sending message");
   }

期望进程失败2次,出现异常,然后出现“断路器断开”异常,但在控制台中抛出下面的异常后,它就停止了。
另外,我们如何在这里配置错误通道。
https://gist.github.com/anonymous/67aae50e548c78470cd0
更新配置:

<int:service-activator input-channel="toKafka"  ref="gw">
    <int:request-handler-advice-chain> <ref bean="circuitBreakerAdvice"/>
                      </int:request-handler-advice-chain> 
   </int:service-activator>

   <int:channel id="failedChannel1" />

   <int:gateway id="gw" default-request-channel="toKafka" default-reply-timeout="0" error-channel="failedChannel1"  />

     <int:chain input-channel="failedChannel1">
        <int:transformer expression="'failed:'+payload.failedMessage.payload+ ' with a' +payload.cause.message" />
        <int-stream:stderr-channel-adapter append-newline="true"/>
            </int:chain>

低于异常。
failed:testvo[数据=示例消息]]无法处理消息。
https://gist.github.com/anonymous/921be7691c41d125dc84
但是,它正在处理相同的消息(消息内容被有意更改)
还尝试将生产者上下文的无效值:例如broker-list/value类类型作为无效类类型,如下所示。
得到下面的错误,但希望得到cb进入图片和消息应该流到错误通道。
在值类类型的情况下:cb没有被调用,但是消息流到错误通道,但是对于发布的1条消息有很多消息。
failed:testvo [data={tes message}}]找不到能够从xx..vo.testvo类型转换为java.lang.string类型的转换器
控制台中多次出现这种情况。
对于代理列表:它只是在控制台中抛出异常。
https://gist.github.com/anonymous/6ece517fb5e82ac73492
期望:cb被调用,并且消息流在所有情况下都指向错误通道。

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext"
                                                                       auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/>  

  <int-kafka:producer-context id="producerContext" producer-properties="producerProperties">
                                                            <int-kafka:producer-configurations>
                                                                           <int-kafka:producer-configuration
                                                                                            broker-list="1.2.3:9092" topic="headers['topic']" key-class-type="java.lang.String"
                                                                                            value-class-type="java.lang.String"
                                                                                            value-encoder="kafkaEncoder" key-encoder="kafkaKeyEncoder"
                                                                                            compression-type="none" />
                                                            </int-kafka:producer-configurations>
                                            </int-kafka:producer-context>
fhity93d

fhity93d1#

有了这些代码,你需要 try {...} 周围 send() .
前两次尝试会抓住你的机会 RuntimeException ; 下一个将捕获断路器异常。
使用带有错误通道的消息传递网关,而不是直接发送到通道。
编辑
此代码。。。

<int:service-activator input-channel="toKafka"  ref="gw">
    <int:request-handler-advice-chain> <ref bean="circuitBreakerAdvice"/>
                  </int:request-handler-advice-chain> 
</int:service-activator>

<int:gateway id="gw" default-request-channel="toKafka" default-reply-timeout="0" error-channel="failedChannel1"  />

当您向发送消息时 toKafka ,将调用将消息发送到的网关 toKafka 在一个循环中。
它将导致堆栈溢出。

相关问题