java Spring Integration DSL:仅基于消息计数应用聚合器并以并行方式发送到Http端点

b1zrtrql  于 2023-08-02  发布在  Java
关注(0)|答案(1)|浏览(85)

我对 Spring 注射是全新的。我在寻找下面的逻辑:
1.消息将发布到队列通道(可以更改)。我只想节流
1.使用Spring注入从队列中聚合2条消息(不需要关联),并将其作为单个消息发送到Http端点。http端点有时可能会很慢。因此,我希望有10个并行线程并行处理消息。每个线程从队列中挑选2条消息并发送到Http终结点。

@Bean
public IntegrationFlow myFlow() {
       return IntegrationFlow.from(Constants.INBOUND_CHANNEL)
        .bridge(e ->         e.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(5).taskExecutor(taskExecutor1())))
        .log()
        .aggregate(a -> a.correlationStrategy(m -> m != null).releaseStrategy(g -> g.size() == 2)
                .expireGroupsUponCompletion(true).expireGroupsUponTimeout(true).async(true))
        .log().handle(a -> {
            //Add code here to send the data to Http end point which takes around 5 secs.
            System.out.println("Thread Sleeping on ThreadId : " + Thread.currentThread().getId());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }, e -> e.async(true)).get();
}

    @Bean
    TaskExecutor taskExecutor1() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(2);
        return taskExecutor;
    }

  @Bean(name = Constants.INBOUND_CHANNEL)
  public QueueChannel inboundFilePollingChannel() {
      return new QueueChannel(100);
  }

字符串
上面的代码运行正常。但总体逻辑看起来是按顺序发生的。在释放聚合组之前,任务执行程序的其他线程不会选择要处理的消息,也不会将消息发送到Http终结点。
有没有什么方法的提示?

iih3973s

iih3973s1#

首先,如果service activator不返回CompletableFuture或Reactive Streams Publisher,则async(true)没有帮助。
那么你有这个问题:correlationStrategy(m -> m != null),它总是为您发送到此聚合器的所有消息返回trueAggregatingMessageHandler的进一步逻辑是这样做的:

UUID groupIdUuid = UUIDConverter.getUUID(correlationKey);
    Lock lock = this.lockRegistry.obtain(groupIdUuid.toString());

字符串
这是锁定的,直到我们完成了当前的消息。因此,即使你从不同的线程生成到这个聚合器,你仍然以顺序和阻塞的方式进行聚合。这是因为根据相关策略,您对所有消息使用相同的Lock示例。
还有一个问题看起来不像是并行调用HTTP服务。您需要考虑在aggregate()handle()之间添加ExecutorChannel。这样,聚合器的结果(两条消息)将被生成到另一个线程,当前线程将被释放以处理下一条聚合消息。
我们没有一个开箱即用的组件,只是按照消息到达的顺序发出一些消息:我们只有基于相关键的逻辑:AggregatingMessageHandlerResequencingMessageHandlerCorrelatingMessageBarrierFluxAggregatorMessageHandler
我认为你仍然可以依赖于聚合器中的顺序释放,使用你的通用相关键,但是你不需要在轮询器上使用taskExecutor,因为它没有任何效果,而是在聚合器之后使用channel(c -> c.executor())。这样,您将看到对HTTP服务的并行调用,每个调用包含2条消息。

相关问题