我正在使用Apache Camel连接和订阅ActiveMQ Artemis提要,并转发到Kafka主题。
它可以正常运行一段时间,但随后停止,出现异常:jakarta.jms.JMSSecurityException: username ' [[email protected]](https://stackoverflow.com/cdn-cgi/l/email-protection) ' has too many connections
所以很明显Camel的工作方式中有一些东西是ActiveMQ Artemis或连接那端的一些安全设置不喜欢的。我知道我连接到的提要确实有一些检查,以防止应用程序快速连续多次连接和断开连接;所以我想我击中了这一点。
仔细研究一下,看起来camel会定期重新连接和拉取,而不是创建一个连接(甚至每个Camel Route都是一个连接);所以我认为这会导致代理将我踢出。
有其他人遇到过吗?有什么想法吗?
参考代码:
ActiveMQComponent amqComponent = new ActiveMQComponent();
amqComponent.setUsername(amqUser);
amqComponent.setPassword(amqPassword);
amqComponent.setClientId("clientID");
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setTrustedPackages(List.of("com..."));
cf.setBrokerURL("tcp://" + amqServer + ":" + amqPort + "?jms.watchTopicAdvisories=false");
amqComponent.setConnectionFactory(cf);
//amqComponent.setMaxConcurrentConsumers(1);
//amqComponent.setSubscriptionShared(true);
//amqComponent.setUsePooledConnection(true);
context.addComponent("activemq", amqComponent);
//...
from("activemq:topic:" + amqFeedTopic + "?clientId=" + clientid + "&durableSubscriptionName=" + clientid + "-sub")
.id(clientid)
.description("Connection for " + clientid)
.to("kafka:" + kafkaTopic + "?brokers=" + kafkaBootstrapServers);
字符串
1条答案
按热度按时间wnrlj8wa1#
这个答案:Spring Integration: How to use SingleConnectionFactory with ActiveMQ?为我指明了正确的方向。
从本质上讲,连接工厂决定了方法;默认的连接工厂将定期断开和重新连接。
要解决这个问题,请使用
SingleConnectionFactory
或它的子类CachingConnectionFactory
。现在,它们不能像标准AMQ那样接受用户/密码,因此您需要使用
UserCredentialsConnectionFactoryAdapter
来提供这些。很简单,对吧?
下面是一些代码:
字符串
另外请注意,使用这种方法,您只能有一个Camel Route使用此Caching Connection Factory。如果您尝试添加多个Camel Route,则会在其他路由上出现错误。
要解决这个问题,您需要为每个Route使用一个全新的Camel Context,并在每个Route上配置这些连接工厂。