Camel多次连接到ActiveMQ Artemis

vs91vp4v  于 2023-11-18  发布在  Apache
关注(0)|答案(1)|浏览(150)

我正在使用Apache Camel连接和订阅ActiveMQ Artemis提要,并转发到Kafka主题。
它可以正常运行一段时间,但随后停止,出现异常:jakarta.jms.JMSSecurityException: username ' [[email protected]](https://stackoverflow.com/cdn-cgi/l/email-protection) ' has too many connections
所以很明显Camel的工作方式中有一些东西是ActiveMQ Artemis或连接那端的一些安全设置不喜欢的。我知道我连接到的提要确实有一些检查,以防止应用程序快速连续多次连接和断开连接;所以我想我击中了这一点。
仔细研究一下,看起来camel会定期重新连接和拉取,而不是创建一个连接(甚至每个Camel Route都是一个连接);所以我认为这会导致代理将我踢出。
有其他人遇到过吗?有什么想法吗?
参考代码:

ActiveMQComponent amqComponent = new ActiveMQComponent();
amqComponent.setUsername(amqUser);
amqComponent.setPassword(amqPassword);
amqComponent.setClientId("clientID");

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setTrustedPackages(List.of("com..."));
cf.setBrokerURL("tcp://" + amqServer + ":" + amqPort + "?jms.watchTopicAdvisories=false");
amqComponent.setConnectionFactory(cf);
//amqComponent.setMaxConcurrentConsumers(1);
//amqComponent.setSubscriptionShared(true);
//amqComponent.setUsePooledConnection(true);
           
context.addComponent("activemq", amqComponent);

//...

from("activemq:topic:" + amqFeedTopic + "?clientId=" + clientid  + "&durableSubscriptionName=" + clientid + "-sub")
                .id(clientid)
                .description("Connection for " + clientid)
                .to("kafka:" + kafkaTopic + "?brokers=" + kafkaBootstrapServers);

字符串

wnrlj8wa

wnrlj8wa1#

这个答案:Spring Integration: How to use SingleConnectionFactory with ActiveMQ?为我指明了正确的方向。
从本质上讲,连接工厂决定了方法;默认的连接工厂将定期断开和重新连接。

  • 为了清楚起见,是Spring JMS代码,而不是连接工厂,正在执行断开和重新连接。事实上,即使使用下面概述的解决方案,它仍然这样做。缓存连接工厂向Spring JMS提供代理并保持连接打开。感谢Doug格罗夫在评论中指出这一点。*

要解决这个问题,请使用SingleConnectionFactory或它的子类CachingConnectionFactory
现在,它们不能像标准AMQ那样接受用户/密码,因此您需要使用UserCredentialsConnectionFactoryAdapter来提供这些。
很简单,对吧?
下面是一些代码:

ActiveMQComponent amqComponent = new ActiveMQComponent();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://" + amqServer + ":" + amqPort + " jms.watchTopicAdvisories=false");
       
UserCredentialsConnectionFactoryAdapter uca = new UserCredentialsConnectionFactoryAdapter();
uca.setUsername(amqUser);
uca.setPassword(amqPassword);
uca.setTargetConnectionFactory(cf);
            
CachingConnectionFactory ccf = new CachingConnectionFactory(uca);
ccf.setClientId(amqClientID);
            
amqComponent.setConnectionFactory(ccf);
amqComponent.setMaxConcurrentConsumers(1);
            
context.addComponent("activemq", amqComponent);

字符串
另外请注意,使用这种方法,您只能有一个Camel Route使用此Caching Connection Factory。如果您尝试添加多个Camel Route,则会在其他路由上出现错误。
要解决这个问题,您需要为每个Route使用一个全新的Camel Context,并在每个Route上配置这些连接工厂。

相关问题