当activemqcomponent抛出connectexception时执行自定义errorhandler

jckbn6z7  于 2021-06-30  发布在  Java
关注(0)|答案(0)|浏览(273)

我正在用spring boot和javadsl开发一个camel路由。我创建了一个基于 ActiveMQComponent 以便从activemq artemis中提取消息。当我关闭本地activemq artemis时,连接将丢失,路由将引发以下异常:

2020-12-11 12:18:15,953 [ANGES.RECEIVER]] ERROR rContainer - Listener exception overridden by rollback exception javax.jms.JMSException: java.io.EOFException

... 然后组件再次尝试连接代理:

2020-12-11 12:18:15,953 [ANGES.RECEIVER]] WARN  rContainer -            -                      - Setup of JMS message listener invoker failed for destination 'EXCHANGES.RECEIVER' - trying to recover. Cause: Could not roll back JMS transaction; nested exception is javax.jms.IllegalStateException: The Session is closed
2020-12-11 12:18:17,988 [ANGES.RECEIVER]] ERROR rContainer -            -                      - Could not refresh JMS Connection for destination 'EXCHANGES.RECEIVER' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect
2020-12-11 12:18:25,014 [ANGES.RECEIVER]] ERROR rContainer -            -                      - Could not refresh JMS Connection for destination 'EXCHANGES.RECEIVER' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect
2020-12-11 12:18:32,055 [ANGES.RECEIVER]] ERROR rContainer -            -                      - Could not refresh JMS Connection for destination 'EXCHANGES.RECEIVER' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect...

我需要的是处理该异常并向路由发送消息“direct:mailnotification". 我想定个规矩 errorHandler 在我的组件中(请参见第行 amqComp.setErrorHandler(myerror); 在本规范中):

import javax.jms.ConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class AmqConfigurationConsumer {

    @Autowired
    private AmqConsumer consumerBase;

    @Autowired
    private MyAMQErrorHandler myerror;

    @Bean
    public ConnectionFactory connectionFactory(){
        String brokerURL;

        if(consumerBase.getHostNameFailover() == null || consumerBase.getHostNameFailover().equals("")) {
            brokerURL = "tcp://" + consumerBase.getHostName() + ":" + consumerBase.getPort(); 
        } else {
            brokerURL = "failover:(tcp://" + consumerBase.getHostName() + ":" + consumerBase.getPort() 
            + ",tcp://" + consumerBase.getHostNameFailover() + ":" + consumerBase.getPortFailover() + ")?maxReconnectAttempts=3"; 
        }       
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(brokerURL);
        connectionFactory.setUserName(consumerBase.getUser());
        connectionFactory.setPassword(consumerBase.getPasswd());

        RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
        policy.setInitialRedeliveryDelay(consumerBase.getInitialRedeliveryDelay());
        policy.setRedeliveryDelay(consumerBase.getRedeliveryDelay());
        policy.setBackOffMultiplier(consumerBase.getBackOffMultiplier());
        policy.setUseExponentialBackOff(consumerBase.isUseExponentialBackOff());
        policy.setMaximumRedeliveries(consumerBase.getMaximumRedeliveries());
        return connectionFactory;
    }

    @Bean(name = "activemq-component")
    public ActiveMQComponent amqpComponent() {
        ActiveMQComponent amqComp= new ActiveMQComponent();
        amqComp.setTransacted(consumerBase.isTransacted());
        amqComp.setTransactionManager(txManager());
        amqComp.setCacheLevelName("CACHE_CONSUMER");
        amqComp.setConnectionFactory(connectionFactory());
        amqComp.setConcurrentConsumers(consumerBase.getConcurrentConsumers());
        amqComp.setErrorHandler(myerror);
        return amqComp;
    }

    @Bean 
    public PlatformTransactionManager txManager() {
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory());
        return jmsTransactionManager;
    }
}

还有我的习惯 ErrorHandler 实施:

import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;

@Component("myAMQErrorHandler")
public class MyAMQErrorHandler implements ErrorHandler {

    static final Logger logger = Logger.getLogger("wk-currencyratefacelecLogger");

    @Override
    public void handleError(Throwable t) {
        logger.info("Watch this exception: "+t.getCause());
        System.out.println("Watch this other exception: "+t.getCause());
    }
}

我的路线是这样建造的 onException 也不起作用):

import java.net.ConnectException;
import java.net.SocketTimeoutException;

import javax.jms.JMSException;

import org.apache.camel.LoggingLevel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.avianca.esb.wkcurrencyratefacelec.properties.AmqConsumer;
import com.avianca.esb.wkcurrencyratefacelec.properties.MailProperties;
import com.avianca.esb.wkcurrencyratefacelec.configurator.ConfigurationRoute;

@Component
public class AMQPConsumerRoute extends ConfigurationRoute {

    @Autowired
    private AmqConsumer amqConsumerConfig;

    @Autowired
    private MailProperties mailConfig;

    @Value("${application}")
    private String application;

    public void configure() throws Exception {
        super.configure();

        onException(ConnectException.class)
            .maximumRedeliveries(5)
            .redeliveryDelay(2000)
            .log(LoggingLevel.ERROR, "ESB-TEC-04 Error: No se pudo establecer comunicación con AMQ.")
            .log(LoggingLevel.ERROR, "ExceptionClass: ${exchangeProperty.CamelExceptionCaught.class}")
            .log(LoggingLevel.ERROR, "ExceptionClassName: ${exchangeProperty.CamelExceptionCaught.class.name}")
            .log(LoggingLevel.ERROR, "StackTrace: ${exception.stacktrace}")
            .setHeader("mailErrorDescription", simple(mailConfig.getErrorConexion().toString()+": AMQ, cola "+amqConsumerConfig.getQueueName()))
            .to("direct:mailNotification")
        .end();

        onException(SocketTimeoutException.class)
            .maximumRedeliveries(5)
            .redeliveryDelay(2000)
            .log(LoggingLevel.ERROR, "ESB-TEC-04 Error: No se pudo establecer comunicación con AMQ.")
            .log(LoggingLevel.ERROR, "ExceptionClass: ${exchangeProperty.CamelExceptionCaught.class}")
            .log(LoggingLevel.ERROR, "ExceptionClassName: ${exchangeProperty.CamelExceptionCaught.class.name}")
            .log(LoggingLevel.ERROR, "StackTrace: ${exception.stacktrace}")
            .setHeader("mailErrorDescription", simple(mailConfig.getErrorConexion().toString()+": AMQ, cola "+amqConsumerConfig.getQueueName()))
            .to("direct:mailNotification")
        .end();

        onException(JMSException.class, javax.jms.IllegalStateException.class).handled(true)
            .maximumRedeliveries(5)
            .redeliveryDelay(2000)
            .log(LoggingLevel.ERROR, "AMQ-01 El mensaje no ha sido almacenado en la cola presenta errores en la ruta ${routeId}")
            .log(LoggingLevel.ERROR, "ExceptionClass: ${exchangeProperty.CamelExceptionCaught.class}")
            .log(LoggingLevel.ERROR, "ExceptionClassName: ${exchangeProperty.CamelExceptionCaught.class.name}")
            .log(LoggingLevel.ERROR, "StackTrace: ${exception.stacktrace}")
        .end();

        from("activemq-component:queue://" + amqConsumerConfig.getQueueName()+"?selector=app='"+application.toUpperCase()+"' OR app='"+application.toLowerCase()+"'").routeId("wkcurrencyratefacelec_amqp_consumer")
            .log("Inicia ruta").log("${body}")
            .to("direct:transformationRoute")
        .end();
    }
}

但是当我启动路由并关闭我的artemis示例时,日志不会打印,所以我假设错误处理程序不工作。如果可行的话,我会实施 ProducerTemplate 但是首先我需要知道如何正确地实现错误处理程序。
更新:我修改了 exceptionListenerActiveMQConnectionFactory . 启动后路由失去连接时工作:

connectionFactory.setExceptionListener(new ExceptionListener() {
            @Override
            public void onException(JMSException ex) {
                if (ex.getCause() != null && ex.getCause().toString().equals("java.io.EOFException")) {
                    logger.error("Mensaje: "+ex.getMessage());
                    logger.error("Name: "+ex.getClass().getName());
                    Exception exception = new Exception("Se ha perdido la conexión al AMQ origen. Por favor, requerimos de su intervención lo antes posible. Gracias.", new Throwable("Error de conexión."));
                    template.send("direct:mailNotification", new Processor() {
                        @Override
                        public void process(Exchange exc) throws Exception {
                            exc.getIn().setBody("aviso");
                            exc.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
                            exc.getIn().setHeader("mailErrorDescription", "Lost Connection from origin AMQ");
                        }
                    });
                } else {
                    logger.error("Mensaje no JMS Connect: "+ex.getMessage());
                    logger.error("Name no JMS Connect: "+ex.getClass().getName());
                }
            }
        });

但是,还有两个额外的问题:1-侦听器在路由启动时不工作,并且无法连接到amq(是否存在任何控制该路由的方法?)。2-一旦路由检测到连接丢失,发送消息并停止,我是否可以处理重试,以便每x次重试发送相同的消息?。我找不到任何方法来修改 FixedBackoff 有了这种实现。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题