java Spring Batch真的可以防止并发执行相同的作业吗?它真的是幂等的吗?

0md85ypi  于 2023-03-21  发布在  Java
关注(0)|答案(1)|浏览(197)

我研究了Spring Batch,因为我必须使用它,我想知道Spring Batch是否真的可以防止使用相同的参数执行相同的作业。
在本文中,“相同的作业”表示“具有实际相同的jobName和jobParameters的作业”。
我得到了一些积分。

  • Spring Batch可以防止执行以前从未执行过的相同多个作业
  • Spring Batch可以防止在前一个作业成功后执行相同的作业

但我有个问题:如果上次执行失败,执行相同的作业怎么样?

在检查Spring Batch代码时,我觉得这种情况可能会发生,但很少发生。

@Override
    public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {

        Assert.notNull(jobName, "Job name must not be null.");
        Assert.notNull(jobParameters, "JobParameters must not be null.");

        /*
         * Find all jobs matching the runtime information.
         *
         * If this method is transactional, and the isolation level is
         * REPEATABLE_READ or better, another launcher trying to start the same
         * job in another thread or process will block until this transaction
         * has finished.
         */

        JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
        ExecutionContext executionContext;

        // existing job instance found
        if (jobInstance != null) {

            List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);

            if (executions.isEmpty()) {
                throw new IllegalStateException("Cannot find any job execution for job instance: " + jobInstance);
            }

            // check for running executions and find the last started
            for (JobExecution execution : executions) {
                if (execution.isRunning() || execution.isStopping()) {
                    throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
                            + jobInstance);
                }
                BatchStatus status = execution.getStatus();
                if (status == BatchStatus.UNKNOWN) {
                    throw new JobRestartException("Cannot restart job from UNKNOWN status. "
                            + "The last execution ended with a failure that could not be rolled back, "
                            + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
                }
                Collection<JobParameter> allJobParameters = execution.getJobParameters().getParameters().values();
                long identifyingJobParametersCount = allJobParameters.stream().filter(JobParameter::isIdentifying).count();
                if (identifyingJobParametersCount > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) {
                    throw new JobInstanceAlreadyCompleteException(
                            "A job instance already exists and is complete for parameters=" + jobParameters
                            + ".  If you want to run this job again, change the parameters.");
                }
            }
            executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
        }
        else {
            // no job found, create one
            jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
            executionContext = new ExecutionContext();
        }

        JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
        jobExecution.setExecutionContext(executionContext);
        jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));

        // Save the JobExecution so that it picks up an ID (useful for clients
        // monitoring asynchronous executions):
        jobExecutionDao.saveJobExecution(jobExecution);
        ecDao.saveExecutionContext(jobExecution);

        return jobExecution;

    }

根据createJobExecution,如果作业示例已经存在,并且作业的最后一次执行失败,则该方法使用增量id保存新的执行。
上面写着“如果这个方法是事务性的,并且隔离级别是REPEATABLE_READ或更好,那么另一个启动器尝试在另一个线程或进程中启动相同的作业将阻塞,直到这个事务完成为止”,然而,即使这个方法是事务性的,并且用“SERIALIZABLE”级别隔离它不使用排他锁来查询作业的执行,使得另一个启动器可以同时获得最后的执行。
当然,一旦另一个启动器在第一个启动器提交事务之前获得最后一次执行,它将保存新的执行并执行自己的任务。
我试图找出是否有任何提示,以防止这种情况,但我不能。我需要你的帮助。谢谢。
综上所述:我认为当多个批处理应用同时触发时,同一个作业可以执行两次或更多次,并且作业的最后一次执行状态为FAILED。我说的对吗?请给予我一些提示。

xeufq47z

xeufq47z1#

当最后一次执行失败时,执行相同的作业怎么样?
创建作业执行的方法被设计为事务性的。正如您分享的文档中所提到的,隔离级别REPEATABLE_READ应该在大多数情况下都可以工作。如果这还不够(即您仍然看到为同一作业示例启动的并发作业执行),那么您需要将隔离级别提高到SERIALIZABLE。这将真正在数据库级别上串行化并发访问,Spring Batch对此无能为力。
现在来回答您关于失败情况的问题,一旦您在作业存储库上设置了SERIALIZABLE隔离级别,幸运的赢家将能够访问createJobExecution方法的进程(进程或线程)将检查具有给定名称和标识作业参数的作业示例的存在,并且如果已经存在失败的执行,将创建用于相同作业示例的新执行,以从作业在前一次运行中停止的地方恢复作业。

相关问题