Camel 如何从ActiveMQ Artemis消息中获取重试计数

6qqygrtg  于 2022-12-23  发布在  Apache
关注(0)|答案(1)|浏览(218)

我正在从ActiveMQ Artemis队列中获取消息,并尝试将其发送到无效的HTTP端点(通过Camel路由),但此操作失败,消息将被重试。每当消息传递失败时,消息将在ActiveMQ Artemis级别重试。重试配置在代理中可用。是否有任何方法可以在客户端应用程序中获取JMS消息的当前重试计数?
我可以在数据库中维护每条消息的重试计数-寻找更好的解决方案/现有方法等。
如果我试图保持计数并在消息头中设置它,那么在重试过程中这些会丢失。重试就像是一个新消息一样发生。所以不能使用它。
我们将org.apache.camel.component.jms.JmsComponent与Qpid JMS客户端一起使用。

svgewumm

svgewumm1#

您应该检查消息上的JMSXDeliveryCount属性,JMS 1.1规范的3.5.9节对JMSXDeliveryCount做了如下说明:
邮件传递尝试的次数;第一个是1第二个是2...
我刚刚用Qpid JMS 1.7.0和ActiveMQ Artemis 2.27.1测试过,它在事务和非事务用例中都运行良好,在这两种情况下,JMSRedelivered头和JMSXDeliveryCount属性都设置得很好。
下面是事务处理用例的代码:

try (Connection connection = new JmsConnectionFactory("amqp://127.0.0.1:5672").createConnection()) {
   Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
   Queue queue = session.createQueue("myQueue");
   MessageConsumer consumer = session.createConsumer(queue);
   MessageProducer producer = session.createProducer(queue);
   TextMessage message = session.createTextMessage("test-message");
   producer.send(message);
   session.commit();
   producer.close();
   connection.start();
   message = (TextMessage) consumer.receive(1000);
   System.out.println("Redelivered? " + message.getJMSRedelivered());
   System.out.println("Delivery count: " + message.getIntProperty("JMSXDeliveryCount"));
   message.acknowledge();
   session.rollback();
   message = (TextMessage) consumer.receive(1000);
   System.out.println("Redelivered? " + message.getJMSRedelivered());
   System.out.println("Delivery count: " + message.getIntProperty("JMSXDeliveryCount"));
}

下面是非事务用例的代码:

try (Connection connection = new JmsConnectionFactory("amqp://127.0.0.1:5672").createConnection()) {
   Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
   Queue queue = session.createQueue("myQueue");
   MessageConsumer consumer = session.createConsumer(queue);
   MessageProducer producer = session.createProducer(queue);
   TextMessage message = session.createTextMessage("test-message");
   producer.send(message);
   producer.close();
   connection.start();
   message = (TextMessage) consumer.receive(1000);
   System.out.println("Redelivered? " + message.getJMSRedelivered());
   System.out.println("Delivery count: " + message.getIntProperty("JMSXDeliveryCount"));
   session.close();
   session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
   consumer = session.createConsumer(queue);
   message = (TextMessage) consumer.receive(1000);
   System.out.println("Redelivered? " + message.getJMSRedelivered());
   System.out.println("Delivery count: " + message.getIntProperty("JMSXDeliveryCount"));
}

在这两种情况下,打印的内容如下:

Redelivered? false
Delivery count: 1
Redelivered? true
Delivery count: 2

您看到的消息作为“新消息”重新传递的行为是预期的,如JMS 1.1规范的3.10节所述:
调用 clearBodyclearProperties 方法使消息体或属性可写后,使用者可以修改接收到的消息。如果使用者修改了接收到的消息,并且随后重新传递了该消息,则重新传递的消息必须是原始的、未修改的消息(JMS提供程序由于重新传递而修改的标头和属性除外,如 JMSRedelivered 标头和 JMSXDeliveryCount 属性)。
我不是Camel的Maven,所以我不能确切地说,当传递到HTTP端点失败时,Camel是如何处理JMS消息的。我所知道的是,这在直接的JMS中工作得很好。我希望Camel也能工作,但Camel可能正在执行自己的重试形式,而不是使用JMS提供的重新传递。
通过从QpidJMS客户机打开AMQP帧跟踪并检查消息传递和最终处置周围的帧,这对于验证来说相对简单。

相关问题