配置详细信息:
<int:publish-subscribe-channel id="toKafka"/>
<int:publish-subscribe-channel id="sendMessageToKafkaChannel"/>
<int:service-activator input-channel="toKafka" output-channel="sendMessageToKafkaChannel" order="1" ref="conditionalProducerService" method="producerCircuitBreaker">
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>
</int:service-activator>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext"
auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/>
<bean id="circuitBreakerAdvice" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2"/>
<property name="halfOpenAfter" value="15000" />
</bean>
public Message<?> producerCircuitBreaker(Message<?> payload) {
throw new RuntimeException("foo Pro");
}
for(int i=0;i<4;i++){
toKafka.send(MessageBuilder
.withPayload(messageVO.getMessageContentVO())
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PARTITION_ID,Integer.parseInt(messageVO.getPartition())).
build());
APPLOGGER.info("sending message");
}
期望进程失败2次,出现异常,然后出现“断路器断开”异常,但在控制台中抛出下面的异常后,它就停止了。
另外,我们如何在这里配置错误通道。
https://gist.github.com/anonymous/67aae50e548c78470cd0
更新配置:
<int:service-activator input-channel="toKafka" ref="gw">
<int:request-handler-advice-chain> <ref bean="circuitBreakerAdvice"/>
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel1" />
<int:gateway id="gw" default-request-channel="toKafka" default-reply-timeout="0" error-channel="failedChannel1" />
<int:chain input-channel="failedChannel1">
<int:transformer expression="'failed:'+payload.failedMessage.payload+ ' with a' +payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
低于异常。
failed:testvo[数据=示例消息]]无法处理消息。
https://gist.github.com/anonymous/921be7691c41d125dc84
但是,它正在处理相同的消息(消息内容被有意更改)
还尝试将生产者上下文的无效值:例如broker-list/value类类型作为无效类类型,如下所示。
得到下面的错误,但希望得到cb进入图片和消息应该流到错误通道。
在值类类型的情况下:cb没有被调用,但是消息流到错误通道,但是对于发布的1条消息有很多消息。
failed:testvo [data={tes message}}]找不到能够从xx..vo.testvo类型转换为java.lang.string类型的转换器
控制台中多次出现这种情况。
对于代理列表:它只是在控制台中抛出异常。
https://gist.github.com/anonymous/6ece517fb5e82ac73492
期望:cb被调用,并且消息流在所有情况下都指向错误通道。
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext"
auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/>
<int-kafka:producer-context id="producerContext" producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
broker-list="1.2.3:9092" topic="headers['topic']" key-class-type="java.lang.String"
value-class-type="java.lang.String"
value-encoder="kafkaEncoder" key-encoder="kafkaKeyEncoder"
compression-type="none" />
</int-kafka:producer-configurations>
</int-kafka:producer-context>
1条答案
按热度按时间fhity93d1#
有了这些代码,你需要
try {...}
周围send()
.前两次尝试会抓住你的机会
RuntimeException
; 下一个将捕获断路器异常。使用带有错误通道的消息传递网关,而不是直接发送到通道。
编辑
此代码。。。
当您向发送消息时
toKafka
,将调用将消息发送到的网关toKafka
在一个循环中。它将导致堆栈溢出。