我正在尝试使用SSL通过Sping Boot 发布/侦听ActiveMQ Artemis。没有任何关于如何做到这一点的文档。
但是我在启动时尝试用channel has already failed
连接ActiveMQ时出现了一个异常。我该如何识别这个问题呢?
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'producer': Invocation of init method failed; nested exception is org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: Cannot send, channel has already failed: tcp://127.0.0.1:5671
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:160) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:440) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1796) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:955) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.23.jar:5.3.23]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.23.jar:5.3.23]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-2.7.5.jar:2.7.5]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) ~[spring-boot-2.7.5.jar:2.7.5]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:308) ~[spring-boot-2.7.5.jar:2.7.5]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306) ~[spring-boot-2.7.5.jar:2.7.5]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295) ~[spring-boot-2.7.5.jar:2.7.5]
at AmqpsDemoApplication.main(AmqpsDemoApplication.java:12) ~[classes/:na]
Caused by: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: Cannot send, channel has already failed: tcp://127.0.0.1:5671
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:584) ~[spring-jms-5.3.23.jar:5.3.23]
at queues.Producer.sendMessage(Producer.java:17) ~[classes/:na]
at queues.Producer.doIt(Producer.java:22) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333) ~[spring-beans-5.3.23.jar:5.3.23]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157) ~[spring-beans-5.3.23.jar:5.3.23]
... 17 common frames omitted
Caused by: javax.jms.JMSException: Cannot send, channel has already failed: tcp://127.0.0.1:5671
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:80) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1423) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1488) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:332) ~[activemq-client-5.16.5.jar:5.16.5]
at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:208) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:495) ~[spring-jms-5.3.23.jar:5.3.23]
... 27 common frames omitted
Caused by: org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://127.0.0.1:5671
at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:328) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:317) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:94) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86) ~[activemq-client-5.16.5.jar:5.16.5]
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1394) ~[activemq-client-5.16.5.jar:5.16.5]
... 31 common frames omitted
我遵循的步骤概述如下:
我创建了一个keystore/trusted keystore,证书如下所示here:
1. Using keytool, create a certificate for the broker:
keytool -genkey -alias broker -keyalg RSA -keystore broker.ks
2. Export the broker’s certificate so it can be shared with clients:
keytool -export -alias broker -keystore broker.ks -file broker_cert
3. Create a certificate/keystore for the client:
keytool -genkey -alias client -keyalg RSA -keystore client.ks
4. Create a truststore for the client, and import the broker’s certificate. This establishes that the client “trusts” the broker:
keytool -import -alias broker -keystore client.ts -file broker_cert
在broker.xml
中,在端口5671
上添加了一个acceptor
,并提供了AMQP协议+sslEnabled
以及存储和密码。密钥库和信任库也已复制到代理。如果我在acceptor
中更改路径值,则在启动时会出现错误,因此我认为它们位于正确的位置,代理可以访问它们。(我在最后附上了完整的broker.xml
)
<acceptor name="amqps">tcp://0.0.0.0:5671?protocols=AMQP;sslEnabled=true;keyStorePath=/opt/app/store/broker.ks;keyStorePassword=password;trustStorePath=/opt/app/store/client.ts;trustStorePassword=password;;enabledProtocols=TLSv1.2;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
我的Sping Boot 应用程序:
ActiveMQConfig.java
使用keystore/trustedstore和passwords定义连接工厂Producer.java
使用@Autowired
SpringJmsTemplate
发送消息AmqpsDemoApplication.java
是一个空的@SpringBootApplication
,使用main
方法application.properties
为空broker.ks
和client.ts
文件将按照上述方法从创建中复制
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/>
</parent>
<groupId>demo</groupId>
<artifactId>amqps-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
ActiveMQConfig.java
设置安全连接工厂:
import org.apache.activemq.ActiveMQConnectionConsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSslConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
@Configuration
@EnableJms
public class ActiveMQConfig {
String BROKER_URL = "tcp://localhost:5671"; // 5671 which is my AMQP+SSL config in my broker.xml
// changing the port back to 61616 still works but that bypasses SSL
String BROKER_USERNAME = "admin";
String BROKER_PASSWORD = "admin";
@Bean
public ActiveMQConnectionFactory connectionFactory() {
final ActiveMQSslConnectionFactory f = new ActiveMQSslConnectionFactory(BROKER_URL);
f.setUserName(BROKER_USERNAME);
f.setPassword(BROKER_PASSWORD);
f.setBrokerURL(BROKER_URL);
try {
f.setKeyStore("client.ts");
f.setTrustStore("broker.ks");
} catch (final Exception e) {
throw new RuntimeException(e);
}
f.setKeyStorePassword("password");
f.setTrustStorePassword("password");
return f;
}
}
Producer.java
,尝试将消息添加到队列
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class Producer {
@Autowired
JmsTemplate jmsTemplate;
@PostConstruct //Triggers publishing a dummy message after startup
public void sendMessage() {
System.out.println("Sending myValue");
jmsTemplate.send("inbound.queue", session -> session.createTextMessage("myValue"));
}
}
完整的ActiveMQ broker.xml
如下:
<?xml version='1.0'?>
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xi="http://www.w3.org/2001/XInclude" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>4040000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>4040000</page-sync-timeout>
<global-max-messages>-1</global-max-messages>
<acceptors>
<!-- AMQPS Acceptor. Here the stores and passwords are specified.-->
<acceptor name="amqps">tcp://0.0.0.0:5671?protocols=AMQP;sslEnabled=true;keyStorePath=/opt/app/store/broker.ks;keyStorePassword=password;trustStorePath=/opt/app/store/client.ts;trustStorePassword=password;;enabledProtocols=TLSv1.2;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<max-size-messages>-1</max-size-messages>
<page-size-bytes>10M</page-size-bytes>
<max-read-page-messages>-1</max-read-page-messages>
<max-read-page-bytes>20M</max-read-page-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>
1条答案
按热度按时间dwbf0jvd1#
在ActiveMQ Artemis的配置中,您将在
amqps
接受器上使用以下命令:这意味着该接受器将只接受AMQP协议连接。
但是,在您的应用程序中,您使用的是使用OpenWire协议的
org.apache.activemq.ActiveMQSslConnectionFactory
,而不是AMQP。因此,您需要更改接受器配置以支持OpenWire,例如:
或者将应用程序正在使用的JMS实现更改为使用AMQP的Qpid JMS。