我有这样的代码。是否可以控制第一次拆分的顺序?
` @Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
return executor;
}
@Bean
public IntegrationFlow firstFlow() {
return IntegrationFlows.from("firstChannel")
.split()
.channel("inputChannel")
.get();
}
@Bean
public IntegrationFlow inputFlow() {
return IntegrationFlows.from("inputChannel")
.channel(MessageChannels.executor(taskExecutor()))
.split()
.handle(this::mapping)
.aggregate()
.channel("aggregateChannel")
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregateChannel")
.aggregate()
.get();
}`
我希望对方法“mapping”进行异步处理,但仅当第一个消息出现在aggregateChannel中时,才开始处理来自第一个拆分的第二个消息并发送到inputChannel
1条答案
按热度按时间tzdcorbm1#
因此,下面是一个可能解决方案的单元测试:
第一个
split()
将项目按顺序发送到该inputChannel
中。然后我们使用gateway
作为子流。此网关将等待回复,以将其向前推送到下一个aggregateChannel
。有趣的部分确实在该子流中,我们使用了第二个拆分器,该拆分器根据Executor
通道并行发送项目。内部聚合器不会“t直到它收集了当前拆分的所有项目。只有在这之后,我们才从顶层拆分中获取下一个项目。测试的结果可能如下所示: