Spring Boot 获取“channel has already failed”异常,使用SSL从Sping Boot 向ActiveMQ Artemis发布消息

cygmwpex  于 2023-04-20  发布在  Spring
关注(0)|答案(1)|浏览(419)

我正在尝试使用SSL通过Sping Boot 发布/侦听ActiveMQ Artemis。没有任何关于如何做到这一点的文档。
但是我在启动时尝试用channel has already failed连接ActiveMQ时出现了一个异常。我该如何识别这个问题呢?

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'producer': Invocation of init method failed; nested exception is org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: Cannot send, channel has already failed: tcp://127.0.0.1:5671
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:160) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:440) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1796) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:955) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.23.jar:5.3.23]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.23.jar:5.3.23]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-2.7.5.jar:2.7.5]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) ~[spring-boot-2.7.5.jar:2.7.5]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:308) ~[spring-boot-2.7.5.jar:2.7.5]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306) ~[spring-boot-2.7.5.jar:2.7.5]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295) ~[spring-boot-2.7.5.jar:2.7.5]
    at AmqpsDemoApplication.main(AmqpsDemoApplication.java:12) ~[classes/:na]
Caused by: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: Cannot send, channel has already failed: tcp://127.0.0.1:5671
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:584) ~[spring-jms-5.3.23.jar:5.3.23]
    at queues.Producer.sendMessage(Producer.java:17) ~[classes/:na]
    at queues.Producer.doIt(Producer.java:22) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333) ~[spring-beans-5.3.23.jar:5.3.23]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157) ~[spring-beans-5.3.23.jar:5.3.23]
    ... 17 common frames omitted
Caused by: javax.jms.JMSException: Cannot send, channel has already failed: tcp://127.0.0.1:5671
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:80) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1423) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1488) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:332) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:208) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:495) ~[spring-jms-5.3.23.jar:5.3.23]
    ... 27 common frames omitted
Caused by: org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://127.0.0.1:5671
    at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:328) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:317) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:94) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86) ~[activemq-client-5.16.5.jar:5.16.5]
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1394) ~[activemq-client-5.16.5.jar:5.16.5]
    ... 31 common frames omitted

我遵循的步骤概述如下:
我创建了一个keystore/trusted keystore,证书如下所示here

1. Using keytool, create a certificate for the broker:
keytool -genkey -alias broker -keyalg RSA -keystore broker.ks

2. Export the broker’s certificate so it can be shared with clients:
keytool -export -alias broker -keystore broker.ks -file broker_cert

3. Create a certificate/keystore for the client:
keytool -genkey -alias client -keyalg RSA -keystore client.ks

4. Create a truststore for the client, and import the broker’s certificate. This establishes that the client “trusts” the broker:
keytool -import -alias broker -keystore client.ts -file broker_cert

broker.xml中,在端口5671上添加了一个acceptor,并提供了AMQP协议+sslEnabled以及存储和密码。密钥库和信任库也已复制到代理。如果我在acceptor中更改路径值,则在启动时会出现错误,因此我认为它们位于正确的位置,代理可以访问它们。(我在最后附上了完整的broker.xml

<acceptor name="amqps">tcp://0.0.0.0:5671?protocols=AMQP;sslEnabled=true;keyStorePath=/opt/app/store/broker.ks;keyStorePassword=password;trustStorePath=/opt/app/store/client.ts;trustStorePassword=password;;enabledProtocols=TLSv1.2;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>

我的Sping Boot 应用程序:

  • ActiveMQConfig.java使用keystore/trustedstore和passwords定义连接工厂
  • Producer.java使用@Autowired Spring JmsTemplate发送消息
  • AmqpsDemoApplication.java是一个空的@SpringBootApplication,使用main方法
  • application.properties为空
  • broker.ksclient.ts文件将按照上述方法从创建中复制

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/>
    </parent>
    <groupId>demo</groupId>
    <artifactId>amqps-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

ActiveMQConfig.java设置安全连接工厂:

import org.apache.activemq.ActiveMQConnectionConsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSslConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;

@Configuration
@EnableJms
public class ActiveMQConfig {

    String BROKER_URL = "tcp://localhost:5671"; // 5671 which is my AMQP+SSL config in my broker.xml
                                                // changing the port back to 61616 still works but that bypasses SSL
    String BROKER_USERNAME = "admin";
    String BROKER_PASSWORD = "admin";

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        final ActiveMQSslConnectionFactory f = new ActiveMQSslConnectionFactory(BROKER_URL);
        f.setUserName(BROKER_USERNAME);
        f.setPassword(BROKER_PASSWORD);
        f.setBrokerURL(BROKER_URL);

        try {
            f.setKeyStore("client.ts");
            f.setTrustStore("broker.ks");
        } catch (final Exception e) {
            throw new RuntimeException(e);
        }
        f.setKeyStorePassword("password");
        f.setTrustStorePassword("password");
        return f;
    }
}

Producer.java,尝试将消息添加到队列

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class Producer {

    @Autowired
    JmsTemplate jmsTemplate;

    @PostConstruct    //Triggers publishing a dummy message after startup
    public void sendMessage() {
        System.out.println("Sending myValue");
        jmsTemplate.send("inbound.queue", session -> session.createTextMessage("myValue"));
    }
}

完整的ActiveMQ broker.xml如下:

<?xml version='1.0'?>
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xi="http://www.w3.org/2001/XInclude" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
    <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq:core ">
        <name>0.0.0.0</name>
        <persistence-enabled>true</persistence-enabled>
        <journal-type>NIO</journal-type>
        <paging-directory>data/paging</paging-directory>
        <bindings-directory>data/bindings</bindings-directory>
        <journal-directory>data/journal</journal-directory>
        <large-messages-directory>data/large-messages</large-messages-directory>
        <journal-datasync>true</journal-datasync>
        <journal-min-files>2</journal-min-files>
        <journal-pool-files>10</journal-pool-files>
        <journal-device-block-size>4096</journal-device-block-size>
        <journal-file-size>10M</journal-file-size>
        <journal-buffer-timeout>4040000</journal-buffer-timeout>
        <journal-max-io>1</journal-max-io>
        <disk-scan-period>5000</disk-scan-period>
        <max-disk-usage>90</max-disk-usage>
        <critical-analyzer>true</critical-analyzer>
        <critical-analyzer-timeout>120000</critical-analyzer-timeout>
        <critical-analyzer-check-period>60000</critical-analyzer-check-period>
        <critical-analyzer-policy>HALT</critical-analyzer-policy>
        <page-sync-timeout>4040000</page-sync-timeout>
        <global-max-messages>-1</global-max-messages>
        <acceptors>
            <!-- AMQPS Acceptor.  Here the stores and passwords are specified.-->
            <acceptor name="amqps">tcp://0.0.0.0:5671?protocols=AMQP;sslEnabled=true;keyStorePath=/opt/app/store/broker.ks;keyStorePassword=password;trustStorePath=/opt/app/store/client.ts;trustStorePassword=password;;enabledProtocols=TLSv1.2;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
            <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
        </acceptors>
        <security-settings>
            <security-setting match="#">
                <permission type="createNonDurableQueue" roles="amq"/>
                <permission type="deleteNonDurableQueue" roles="amq"/>
                <permission type="createDurableQueue" roles="amq"/>
                <permission type="deleteDurableQueue" roles="amq"/>
                <permission type="createAddress" roles="amq"/>
                <permission type="deleteAddress" roles="amq"/>
                <permission type="consume" roles="amq"/>
                <permission type="browse" roles="amq"/>
                <permission type="send" roles="amq"/>
                <!-- we need this otherwise ./artemis data imp wouldn't work -->
                <permission type="manage" roles="amq"/>
            </security-setting>
        </security-settings>
        <address-settings>
            <!-- if you define auto-create on certain queues, management has to be auto-create -->
            <address-setting match="activemq.management#">
                <dead-letter-address>DLQ</dead-letter-address>
                <expiry-address>ExpiryQueue</expiry-address>
                <redelivery-delay>0</redelivery-delay>
                <!-- with -1 only the global-max-size is in use for limiting -->
                <max-size-bytes>-1</max-size-bytes>
                <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <address-full-policy>PAGE</address-full-policy>
                <auto-create-queues>true</auto-create-queues>
                <auto-create-addresses>true</auto-create-addresses>
            </address-setting>
            <!--default for catch all-->
            <address-setting match="#">
                <dead-letter-address>DLQ</dead-letter-address>
                <expiry-address>ExpiryQueue</expiry-address>
                <redelivery-delay>0</redelivery-delay>
                <max-size-bytes>-1</max-size-bytes>
                <max-size-messages>-1</max-size-messages>
                <page-size-bytes>10M</page-size-bytes>
                <max-read-page-messages>-1</max-read-page-messages>
                <max-read-page-bytes>20M</max-read-page-bytes>
                <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <address-full-policy>PAGE</address-full-policy>
                <auto-create-queues>true</auto-create-queues>
                <auto-create-addresses>true</auto-create-addresses>
                <auto-delete-queues>false</auto-delete-queues>
                <auto-delete-addresses>false</auto-delete-addresses>
            </address-setting>
        </address-settings>
        <addresses>
            <address name="DLQ">
                <anycast>
                    <queue name="DLQ"/>
                </anycast>
            </address>
            <address name="ExpiryQueue">
                <anycast>
                    <queue name="ExpiryQueue"/>
                </anycast>
            </address>
        </addresses>
    </core>
</configuration>
dwbf0jvd

dwbf0jvd1#

在ActiveMQ Artemis的配置中,您将在amqps接受器上使用以下命令:

protocols=AMQP

这意味着该接受器将接受AMQP协议连接。
但是,在您的应用程序中,您使用的是使用OpenWire协议的org.apache.activemq.ActiveMQSslConnectionFactory,而不是AMQP。
因此,您需要更改接受器配置以支持OpenWire,例如:

protocols=AMQP,OPENWIRE

或者将应用程序正在使用的JMS实现更改为使用AMQP的Qpid JMS

相关问题