spring集成中向amqp代理传递持久性的消息存储

wnrlj8wa  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(419)

我正在尝试构建集成流,它将防止在传递到amqp代理(rabbitmq)的过程中丢失消息。在broker停止的情况下,我看到一些出乎意料的行为:
失败的邮件正在保存到邮件存储,但保存时间不会太长。这个流不是在等待代理的可用性,它从消息存储中提取消息,即使代理仍然被停止
如果成功重新启动rabbitmq,则消息存储区中的记录(如果仍然存在)不会传递到队列。
请帮我调查一下。代码示例:

  1. @Bean
  2. public MessageChannel messageStoreBackedChannel() {
  3. return new QueueChannel(
  4. new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
  5. );
  6. }
  7. @Bean
  8. public IntegrationFlow someFlow() {
  9. return IntegrationFlows
  10. .from("messageStoreBackedChannel")
  11. .channel("amqpMessageChannel")
  12. .get();
  13. }
  14. @Bean
  15. public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
  16. return IntegrationFlows
  17. .from("amqpMessageChannel")
  18. .handle(message -> System.out.println(message.getPayload()))
  19. .get();
  20. }
  21. @Bean
  22. public MessageChannel amqpMessageChannel() {
  23. return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
  24. }
  25. @Bean
  26. public JdbcChannelMessageStore jdbcChannelMessageStore() {
  27. var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
  28. jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
  29. return jdbcChannelMessageStore;
  30. }
  31. @Bean(name = PollerMetadata.DEFAULT_POLLER)
  32. public PollerMetadata defaultPoller() {
  33. PollerMetadata pollerMetadata = new PollerMetadata();
  34. pollerMetadata.setTrigger(new PeriodicTrigger(10));
  35. return pollerMetadata;
  36. }
dy1byipe

dy1byipe1#

请考虑在 .from("messageStoreBackedChannel").channel("amqpMessageChannel") 作为 transactional() .
像这样:

  1. .from("messageStoreBackedChannel")
  2. .bridge(e -> e.poller(p -> p.fixedDelay(10).transactional()))
  3. .channel("amqpMessageChannel")

所以,无论何时 amqpMessageChannel 失败,事务将回滚,失败的消息将返回到存储区,直到下一次轮询。
你当然可以阻止 bridge 当连接到rabbitmq时出现错误。但你怎么确定这种联系又回来了呢?。。

相关问题