我正在用spring boot和javadsl开发一个camel路由。我创建了一个基于 ActiveMQComponent
以便从activemq artemis中提取消息。当我关闭本地activemq artemis时,连接将丢失,路由将引发以下异常:
2020-12-11 12:18:15,953 [ANGES.RECEIVER]] ERROR rContainer - Listener exception overridden by rollback exception javax.jms.JMSException: java.io.EOFException
... 然后组件再次尝试连接代理:
2020-12-11 12:18:15,953 [ANGES.RECEIVER]] WARN rContainer - - - Setup of JMS message listener invoker failed for destination 'EXCHANGES.RECEIVER' - trying to recover. Cause: Could not roll back JMS transaction; nested exception is javax.jms.IllegalStateException: The Session is closed
2020-12-11 12:18:17,988 [ANGES.RECEIVER]] ERROR rContainer - - - Could not refresh JMS Connection for destination 'EXCHANGES.RECEIVER' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect
2020-12-11 12:18:25,014 [ANGES.RECEIVER]] ERROR rContainer - - - Could not refresh JMS Connection for destination 'EXCHANGES.RECEIVER' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect
2020-12-11 12:18:32,055 [ANGES.RECEIVER]] ERROR rContainer - - - Could not refresh JMS Connection for destination 'EXCHANGES.RECEIVER' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect...
我需要的是处理该异常并向路由发送消息“direct:mailnotification". 我想定个规矩 errorHandler
在我的组件中(请参见第行 amqComp.setErrorHandler(myerror);
在本规范中):
import javax.jms.ConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
public class AmqConfigurationConsumer {
@Autowired
private AmqConsumer consumerBase;
@Autowired
private MyAMQErrorHandler myerror;
@Bean
public ConnectionFactory connectionFactory(){
String brokerURL;
if(consumerBase.getHostNameFailover() == null || consumerBase.getHostNameFailover().equals("")) {
brokerURL = "tcp://" + consumerBase.getHostName() + ":" + consumerBase.getPort();
} else {
brokerURL = "failover:(tcp://" + consumerBase.getHostName() + ":" + consumerBase.getPort()
+ ",tcp://" + consumerBase.getHostNameFailover() + ":" + consumerBase.getPortFailover() + ")?maxReconnectAttempts=3";
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerURL);
connectionFactory.setUserName(consumerBase.getUser());
connectionFactory.setPassword(consumerBase.getPasswd());
RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(consumerBase.getInitialRedeliveryDelay());
policy.setRedeliveryDelay(consumerBase.getRedeliveryDelay());
policy.setBackOffMultiplier(consumerBase.getBackOffMultiplier());
policy.setUseExponentialBackOff(consumerBase.isUseExponentialBackOff());
policy.setMaximumRedeliveries(consumerBase.getMaximumRedeliveries());
return connectionFactory;
}
@Bean(name = "activemq-component")
public ActiveMQComponent amqpComponent() {
ActiveMQComponent amqComp= new ActiveMQComponent();
amqComp.setTransacted(consumerBase.isTransacted());
amqComp.setTransactionManager(txManager());
amqComp.setCacheLevelName("CACHE_CONSUMER");
amqComp.setConnectionFactory(connectionFactory());
amqComp.setConcurrentConsumers(consumerBase.getConcurrentConsumers());
amqComp.setErrorHandler(myerror);
return amqComp;
}
@Bean
public PlatformTransactionManager txManager() {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory());
return jmsTransactionManager;
}
}
还有我的习惯 ErrorHandler
实施:
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;
@Component("myAMQErrorHandler")
public class MyAMQErrorHandler implements ErrorHandler {
static final Logger logger = Logger.getLogger("wk-currencyratefacelecLogger");
@Override
public void handleError(Throwable t) {
logger.info("Watch this exception: "+t.getCause());
System.out.println("Watch this other exception: "+t.getCause());
}
}
我的路线是这样建造的 onException
也不起作用):
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import javax.jms.JMSException;
import org.apache.camel.LoggingLevel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.avianca.esb.wkcurrencyratefacelec.properties.AmqConsumer;
import com.avianca.esb.wkcurrencyratefacelec.properties.MailProperties;
import com.avianca.esb.wkcurrencyratefacelec.configurator.ConfigurationRoute;
@Component
public class AMQPConsumerRoute extends ConfigurationRoute {
@Autowired
private AmqConsumer amqConsumerConfig;
@Autowired
private MailProperties mailConfig;
@Value("${application}")
private String application;
public void configure() throws Exception {
super.configure();
onException(ConnectException.class)
.maximumRedeliveries(5)
.redeliveryDelay(2000)
.log(LoggingLevel.ERROR, "ESB-TEC-04 Error: No se pudo establecer comunicación con AMQ.")
.log(LoggingLevel.ERROR, "ExceptionClass: ${exchangeProperty.CamelExceptionCaught.class}")
.log(LoggingLevel.ERROR, "ExceptionClassName: ${exchangeProperty.CamelExceptionCaught.class.name}")
.log(LoggingLevel.ERROR, "StackTrace: ${exception.stacktrace}")
.setHeader("mailErrorDescription", simple(mailConfig.getErrorConexion().toString()+": AMQ, cola "+amqConsumerConfig.getQueueName()))
.to("direct:mailNotification")
.end();
onException(SocketTimeoutException.class)
.maximumRedeliveries(5)
.redeliveryDelay(2000)
.log(LoggingLevel.ERROR, "ESB-TEC-04 Error: No se pudo establecer comunicación con AMQ.")
.log(LoggingLevel.ERROR, "ExceptionClass: ${exchangeProperty.CamelExceptionCaught.class}")
.log(LoggingLevel.ERROR, "ExceptionClassName: ${exchangeProperty.CamelExceptionCaught.class.name}")
.log(LoggingLevel.ERROR, "StackTrace: ${exception.stacktrace}")
.setHeader("mailErrorDescription", simple(mailConfig.getErrorConexion().toString()+": AMQ, cola "+amqConsumerConfig.getQueueName()))
.to("direct:mailNotification")
.end();
onException(JMSException.class, javax.jms.IllegalStateException.class).handled(true)
.maximumRedeliveries(5)
.redeliveryDelay(2000)
.log(LoggingLevel.ERROR, "AMQ-01 El mensaje no ha sido almacenado en la cola presenta errores en la ruta ${routeId}")
.log(LoggingLevel.ERROR, "ExceptionClass: ${exchangeProperty.CamelExceptionCaught.class}")
.log(LoggingLevel.ERROR, "ExceptionClassName: ${exchangeProperty.CamelExceptionCaught.class.name}")
.log(LoggingLevel.ERROR, "StackTrace: ${exception.stacktrace}")
.end();
from("activemq-component:queue://" + amqConsumerConfig.getQueueName()+"?selector=app='"+application.toUpperCase()+"' OR app='"+application.toLowerCase()+"'").routeId("wkcurrencyratefacelec_amqp_consumer")
.log("Inicia ruta").log("${body}")
.to("direct:transformationRoute")
.end();
}
}
但是当我启动路由并关闭我的artemis示例时,日志不会打印,所以我假设错误处理程序不工作。如果可行的话,我会实施 ProducerTemplate
但是首先我需要知道如何正确地实现错误处理程序。
更新:我修改了 exceptionListener
的 ActiveMQConnectionFactory
. 启动后路由失去连接时工作:
connectionFactory.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException ex) {
if (ex.getCause() != null && ex.getCause().toString().equals("java.io.EOFException")) {
logger.error("Mensaje: "+ex.getMessage());
logger.error("Name: "+ex.getClass().getName());
Exception exception = new Exception("Se ha perdido la conexión al AMQ origen. Por favor, requerimos de su intervención lo antes posible. Gracias.", new Throwable("Error de conexión."));
template.send("direct:mailNotification", new Processor() {
@Override
public void process(Exchange exc) throws Exception {
exc.getIn().setBody("aviso");
exc.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
exc.getIn().setHeader("mailErrorDescription", "Lost Connection from origin AMQ");
}
});
} else {
logger.error("Mensaje no JMS Connect: "+ex.getMessage());
logger.error("Name no JMS Connect: "+ex.getClass().getName());
}
}
});
但是,还有两个额外的问题:1-侦听器在路由启动时不工作,并且无法连接到amq(是否存在任何控制该路由的方法?)。2-一旦路由检测到连接丢失,发送消息并停止,我是否可以处理重试,以便每x次重试发送相同的消息?。我找不到任何方法来修改 FixedBackoff
有了这种实现。
暂无答案!
目前还没有任何答案,快来回答吧!