spring集成中向amqp代理传递持久性的消息存储

wnrlj8wa  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(413)

我正在尝试构建集成流,它将防止在传递到amqp代理(rabbitmq)的过程中丢失消息。在broker停止的情况下,我看到一些出乎意料的行为:
失败的邮件正在保存到邮件存储,但保存时间不会太长。这个流不是在等待代理的可用性,它从消息存储中提取消息,即使代理仍然被停止
如果成功重新启动rabbitmq,则消息存储区中的记录(如果仍然存在)不会传递到队列。
请帮我调查一下。代码示例:

@Bean
public MessageChannel messageStoreBackedChannel() {
    return new QueueChannel(
            new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
    );
}

 @Bean
public IntegrationFlow someFlow() {
    return IntegrationFlows
            .from("messageStoreBackedChannel")
            .channel("amqpMessageChannel")
            .get();
}

@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlows
            .from("amqpMessageChannel")
            .handle(message -> System.out.println(message.getPayload()))
            .get();
}

@Bean
public MessageChannel amqpMessageChannel() {
    return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}

@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
    var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
    jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());

    return jdbcChannelMessageStore;
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}
dy1byipe

dy1byipe1#

请考虑在 .from("messageStoreBackedChannel").channel("amqpMessageChannel") 作为 transactional() .
像这样:

.from("messageStoreBackedChannel")
.bridge(e -> e.poller(p -> p.fixedDelay(10).transactional()))
.channel("amqpMessageChannel")

所以,无论何时 amqpMessageChannel 失败,事务将回滚,失败的消息将返回到存储区,直到下一次轮询。
你当然可以阻止 bridge 当连接到rabbitmq时出现错误。但你怎么确定这种联系又回来了呢?。。

相关问题