Spring集成平行剖分

wfypjpf4  于 2023-02-28  发布在  Spring
关注(0)|答案(1)|浏览(109)

我有这样的代码。是否可以控制第一次拆分的顺序?

` @Bean
  public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(50);
    return executor;
  }

  @Bean
  public IntegrationFlow firstFlow() {
    return IntegrationFlows.from("firstChannel")
        .split()
        .channel("inputChannel")
        .get();
  }
  
  @Bean
  public IntegrationFlow inputFlow() {
    return IntegrationFlows.from("inputChannel")
        .channel(MessageChannels.executor(taskExecutor()))
        .split()
        .handle(this::mapping)
        .aggregate()
        .channel("aggregateChannel")
        .get();
  }

  @Bean
  public IntegrationFlow aggregateFlow() {
    return IntegrationFlows.from("aggregateChannel")
        .aggregate()
        .get();
  }`

我希望对方法“mapping”进行异步处理,但仅当第一个消息出现在aggregateChannel中时,才开始处理来自第一个拆分的第二个消息并发送到inputChannel

tzdcorbm

tzdcorbm1#

因此,下面是一个可能解决方案的单元测试:

@SpringJUnitConfig
public class So75547720Tests {

    @Autowired
    BeanFactory beanFactory;

    @Test
    void sequentialSplitButSubSplitParallel() {
        List<String> firstList = List.of("1", "2", "3", "4");
        List<String> secondList = List.of("5", "6", "7", "8");
        List<List<String>> testData = List.of(firstList, secondList);

        MessagingTemplate messagingTemplate = new MessagingTemplate();
        messagingTemplate.setBeanFactory(this.beanFactory);

        List<List<String>> result = messagingTemplate.convertSendAndReceive("firstChannel", testData, List.class);
        assertThat(result).isNotNull().hasSize(2);
        assertThat(result.get(0)).hasSameElementsAs(firstList);
        assertThat(result.get(1)).hasSameElementsAs(secondList);
        System.out.println(result);
    }

    @Configuration
    @EnableIntegration
    public static class TestConfiguration {

        @Bean
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(5);
            return executor;
        }

        @Bean
        public IntegrationFlow firstFlow() {
            return IntegrationFlow.from("firstChannel")
                    .split()
                    .channel("inputChannel")
                    .get();
        }

        @Bean
        public IntegrationFlow inputFlow() {
            return IntegrationFlow.from("inputChannel")
                    .gateway(subFlow -> subFlow
                            .split()
                            .channel(MessageChannels.executor(taskExecutor()))
                            .handle(this::mapping)
                            .aggregate())
                    .channel("aggregateChannel")
                    .get();
        }

        @Bean
        public IntegrationFlow aggregateFlow() {
            return IntegrationFlow.from("aggregateChannel")
                    .aggregate()
                    .get();
        }

        private String mapping(String payload, Map<String, ?> headers) {
            System.out.println("Handling thread: " + Thread.currentThread().getName() + " for: " + payload);
            return payload.toUpperCase();
        }

    }

}

第一个split()将项目按顺序发送到该inputChannel中。然后我们使用gateway作为子流。此网关将等待回复,以将其向前推送到下一个aggregateChannel。有趣的部分确实在该子流中,我们使用了第二个拆分器,该拆分器根据Executor通道并行发送项目。内部聚合器不会“t直到它收集了当前拆分的所有项目。只有在这之后,我们才从顶层拆分中获取下一个项目。
测试的结果可能如下所示:

Handling thread: taskExecutor-2 for: 2
Handling thread: taskExecutor-1 for: 1
Handling thread: taskExecutor-3 for: 3
Handling thread: taskExecutor-4 for: 4
Handling thread: taskExecutor-2 for: 6
Handling thread: taskExecutor-5 for: 5
Handling thread: taskExecutor-3 for: 7
Handling thread: taskExecutor-1 for: 8
[[2, 3, 1, 4], [6, 5, 7, 8]]

相关问题