在带有远程分区的并发spring批处理作业期间,工作机应答中的消息交叉

fv2wmkja  于 2021-07-24  发布在  Java
关注(0)|答案(0)|浏览(299)

有人问过这个问题,但我认为没有人回答。唯一的答案是aggregator如何使用correlationid。但真正的问题是如何在不检查回复中的jobexecutionid的情况下更新作业状态。我没有足够的声誉来评论现有的问题,所以再问一次。
根据javadoc MessageChannelPartitionHandler 它应该是步骤或作业范围。在我们使用的远程分区场景中 RemotePartitioningManagerStepBuilder 生成不允许设置partitionhandler的管理器步骤。假设每个作业将在rabbitmq上使用相同的队列,则当接收到工作节点回复时,消息将被交叉。有没有简单的方法来重现这一点,但我可以看到这种行为使用一些手动步骤如下
启动第一个作业
在worker可以回复之前杀死manager节点
让工作节点完成对所有分区的处理,并在rabbitmq上发送回复
再次启动管理器节点并启动新作业
有某种机制使第二个作业失败,即读写器显式失败
检查2个作业的状态
预期结果:作业1标记为已完成,作业2标记为失败
实际结果:job-1保持在started状态,job-2被标记为completed,尽管其工作步骤被标记为failed
下面是显示如何配置管理器和工作器步骤的示例代码

@Bean
public Step importDataStep(RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory) {

    return managerStepBuilderFactory.get()
      .<String, String>partitioner("worker", partitioner())
      .gridSize(2)
      .outputChannel(outgoingRequestsToWorkers)
      .inputChannel(incomingRepliesFromWorkers)
      .listener(stepExecutionListener)
      .build();
}

@Bean
public Step worker(
RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
    return workerStepBuilderFactory.get("worker")
          .listener(stepExecutionListener)
          .inputChannel(incomingRequestsFromManager())
          .outputChannel(outgoingRepliesToManager())
          .<String, String>chunk(10)
          .reader(itemReader())
          .processor(itemProcessor())
          .writer(itemWriter());
}

或者,我可以考虑在不发生消息交叉的情况下使用轮询而不是回复。但是,如果管理器节点在工作节点处理时崩溃,则无法重新启动轮询。如果我使用轮询执行上述步骤
实际结果:作业1仍处于启动状态,作业2按预期标记为失败
在轮询的情况下不会发生此问题,因为每个轮询器都使用准确的jobexecutionid来轮询和更新相应的管理器步骤/作业。
我做错什么了?有没有更好的方法来处理这种情况?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题