使用Apache Camel ConsumerTemplate从ActiveMQ Artemis轮询字节/大消息

cyej8jka  于 2023-10-18  发布在  Apache
关注(0)|答案(1)|浏览(147)

当通过JMS连接到ActiveMQ Artemis时,我正在努力解决基于Apache Camel的应用程序中的一个问题。在其中一个Camel路由的末尾,消息存储在Artemis JMS队列中。在同一应用程序中运行的遗留组件使用ConsumerTemplate定期从那里获取它们。
这对于带有纯文本正文的Camel消息很有效,但在使用字节数组正文时会导致错误:Artemis似乎将任何带有字节体的消息视为"large message", which are streamed instead of kept in memory。通过ConsumerTemplate接收可以工作,但是一旦访问了body或header,就会引发如下异常:

org.apache.camel.RuntimeCamelException: Failed to extract body due to: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session. Message: ActiveMQMessage[ID:90c4d1d5-3233-11ea-b0cc-44032c68a56f]:PERSISTENT/ClientLargeMessageImpl[messageID=2974, durable=true, address=mytest,userID=90c4d1d5-3233-11ea-b0cc-44032c68a56f,properties=TypedProperties[firedTime=Wed Jan 08 17:26:03 CET 2020,__AMQ_CID=90b4f34e-3233-11ea-b0cc-44032c68a56f,breadcrumbId=ID-NB045-evolit-co-at-1578500762151-0-1,_AMQ_ROUTING_TYPE=1,_AMQ_LARGE_SIZE=3]]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:172) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsMessage.createBody(JmsMessage.java:221) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:54) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.example.cdi.JmsPoller.someMethod(JmsPoller.java:36) ~[classes/:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_171]
        at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:188) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79) [camel-core-2.22.1.jar:2.22.1]
        at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_171]
        at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_171]
Caused by: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
        at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
        ... 21 more
Caused by: org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
        at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
        ... 21 more

这个问题也发生在不超过Artemis的minLargeMessageSize的消息中,在测试程序中甚至是3字节。
巧合的是,同样的问题也发生在用于测试应用程序的独立应用程序中。在那里,我能够通过保持JMS会话和接收器打开直到JMS消息体和消息头被完全读取来解决这个问题。对于Camel,这在Camel所基于的Spring JmsTemplate中被抽象掉了。
我咨询了user documentation of the Camel JMS component来寻找可能对我有帮助的配置选项。我尝试了以下方法:

  • eagerLoadingOfProperties=true在消费者端:没有影响,似乎只影响MessageListenerContainer。该文档说:

它使用Spring的JmsTemplate进行发送,使用MessageListenerContainer进行消费。
但是,在调试时,似乎只有在使用来自Camel路由中的JMS端点的消息时才使用MessageListenerContainer。使用ConsumerTemplate,就像我的例子一样,使用JmsTemplate进行消费。

  • 消费者端messageConvertermapJmsMessage:无效,它们在会话已关闭时执行
  • alwaysCopyMessage在生产者端:我想也许复制可以防止使用流式传输的大消息,没有效果
  • 生产者端streamMessageTypeEnabled=false:没有影响
  • jmsMessageType=Bytes在生产者和消费者方面:没有影响
  • transferExchange=true在生产者和消费者方面:这似乎解决了我的具体情况,但它感觉像一个变通办法。文档建议谨慎使用该选项。

所以现在,transferExchange似乎是我最好的选择,假设它在所有测试用例中都能解决我的问题。尽管如此,我还是很高兴能更好地了解这个问题或不同的解决方案:
1.为什么阿耳忒弥斯把小字节数组消息当作大消息?

  1. Camel ConsumerTemplate是否支持流式的大消息?
    我的版本是Camel 2.22.1和Artemis 2.10.1。
    通过修改Camel发布包中的Camel Example camel-example-cdi,我已经能够重现我的问题,使其具有如下所示的最小类。此外,我还添加了camel-jms和Artemis依赖项,并在本地启动了Artemis,两者都与camel-example-artemis-large-messages示例中描述的一样。
public class MyRoutes extends RouteBuilder {

    @Override
    public void configure() {
        setupJmsComponent();

        from("timer:writeTimer?period=6000")
                .log("writing to JMS")
                .setBody(() -> new byte[]{0,1,2})
                .to(JmsPoller.ENDPOINT);

        from("timer:pollTimer?period=3000")
            .to("bean:jmsPoller");
    }

    private void setupJmsComponent() {
        ActiveMQJMSConnectionFactory connectionFactory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConnectionFactory(connectionFactory);
        getContext().addComponent("jms", jmsComponent);
    }

}
@Singleton
@Named("jmsPoller")
public class JmsPoller {
    static final String ENDPOINT = "jms:queue:mytest";

    @Inject
    private ConsumerTemplate consumerTemplate;

    public void someMethod(String body) {
        Exchange exchange = consumerTemplate.receive(ENDPOINT, 1000L);
        System.out.println("Received " + (exchange == null ? null : exchange.getIn().getBody()));
    }

}
92dk7w1h

92dk7w1h1#

ActiveMQ Artemis并不把任何带有字节体的消息都视为“大”消息。值得注意的是,代理最终将所有消息体视为一个字节数组,因为它们确实是字节数组。然而,为了被认为是“大”的消息必须超过一定的大小。The documentation声明:
任何大于特定大小的消息都被视为大消息。大型消息将被拆分并以片段形式发送。这由URL参数minLargeMessageSize决定。

备注

Apache ActiveMQ Artemis消息使用每个字符2个字节进行编码,因此如果消息数据用ASCII字符(1个字节)填充,则生成的Apache ActiveMQ Artemis消息的大小大约会增加一倍。这在计算“大”消息的大小时很重要,因为它在发送之前可能看起来小于minLargeMessageSize,但一旦编码,它就会变成“大”消息。
默认值为100KiB。
看起来应用程序的用例根本不符合ActiveMQ Artemis中大消息支持的语义,因为消息来自的会话在消息体完全接收之前就被关闭了。
因此,我建议您要么保持会话打开,直到消息体被读取,要么增加发送消息的应用程序的URL上的minLargeMessageSize,这样就不会有消息被认为是“大”的。后一种选择可能会导致代理上更大的内存使用,因为整个消息体将立即保存在内存中。

相关问题