我对 Spring 注射是全新的。我在寻找下面的逻辑:
1.消息将发布到队列通道(可以更改)。我只想节流
1.使用Spring注入从队列中聚合2条消息(不需要关联),并将其作为单个消息发送到Http端点。http端点有时可能会很慢。因此,我希望有10个并行线程并行处理消息。每个线程从队列中挑选2条消息并发送到Http终结点。
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from(Constants.INBOUND_CHANNEL)
.bridge(e -> e.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(5).taskExecutor(taskExecutor1())))
.log()
.aggregate(a -> a.correlationStrategy(m -> m != null).releaseStrategy(g -> g.size() == 2)
.expireGroupsUponCompletion(true).expireGroupsUponTimeout(true).async(true))
.log().handle(a -> {
//Add code here to send the data to Http end point which takes around 5 secs.
System.out.println("Thread Sleeping on ThreadId : " + Thread.currentThread().getId());
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}, e -> e.async(true)).get();
}
@Bean
TaskExecutor taskExecutor1() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(2);
return taskExecutor;
}
@Bean(name = Constants.INBOUND_CHANNEL)
public QueueChannel inboundFilePollingChannel() {
return new QueueChannel(100);
}
字符串
上面的代码运行正常。但总体逻辑看起来是按顺序发生的。在释放聚合组之前,任务执行程序的其他线程不会选择要处理的消息,也不会将消息发送到Http终结点。
有没有什么方法的提示?
1条答案
按热度按时间iih3973s1#
首先,如果service activator不返回
CompletableFuture
或Reactive StreamsPublisher
,则async(true)
没有帮助。那么你有这个问题:
correlationStrategy(m -> m != null)
,它总是为您发送到此聚合器的所有消息返回true
。AggregatingMessageHandler
的进一步逻辑是这样做的:字符串
这是锁定的,直到我们完成了当前的消息。因此,即使你从不同的线程生成到这个聚合器,你仍然以顺序和阻塞的方式进行聚合。这是因为根据相关策略,您对所有消息使用相同的
Lock
示例。还有一个问题看起来不像是并行调用HTTP服务。您需要考虑在
aggregate()
和handle()
之间添加ExecutorChannel
。这样,聚合器的结果(两条消息)将被生成到另一个线程,当前线程将被释放以处理下一条聚合消息。我们没有一个开箱即用的组件,只是按照消息到达的顺序发出一些消息:我们只有基于相关键的逻辑:
AggregatingMessageHandler
、ResequencingMessageHandler
、CorrelatingMessageBarrier
和FluxAggregatorMessageHandler
。我认为你仍然可以依赖于聚合器中的顺序释放,使用你的通用相关键,但是你不需要在轮询器上使用
taskExecutor
,因为它没有任何效果,而是在聚合器之后使用channel(c -> c.executor())
。这样,您将看到对HTTP服务的并行调用,每个调用包含2条消息。