java—使用动态参数spring batch在foreach循环中运行作业

z9ju0rcb  于 2021-06-30  发布在  Java
关注(0)|答案(0)|浏览(253)

我用springboot创建了一个spring批处理作业。我定制了阅读器,从restapi获取json数据,并将数据转换为java对象,编写器将数据推送到队列中。我在foreach循环中调用我的作业来设置参数并用不同的语言向restapi发送请求。对于第一次迭代,我的作业成功运行,但对于其他迭代,它只显示它已完成。
批量配置:

@Configuration
   @EnableBatchProcessing
   public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory; 

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public RestWebClient webClient;

    @Bean
    public ItemReader<Code> reader() {
        return new CodeAndLabelRestItemReader(webClient);
    }

    @Bean
    public CodeAndLabelItemProcessor processor() {
        return new CodeAndLabelItemProcessor("France","DP","transaction");
    }

    @Bean
    public ItemWriter<CodeAndLabel> calWriter(AmqpTemplate amqpTemplate) {

        return new CodeAndLabelItemWriter(amqpTemplate);             
    }

    @Bean(name = "importJob")
    public Job importCodesAndLabelsJob(JobCompletionNotificationListener listener, Step stepJms) {
        return jobBuilderFactory.get("importJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(stepJms)
                .end()
                .build();
    }

    @Bean
    public Step stepJms(ItemWriter<CodeAndLabel> writer) {
        return stepBuilderFactory.get("stepJms")
                .<Code, CodeAndLabel>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

读卡器:

public class CodeAndLabelRestItemReader implements ItemReader<Code>{

private final RestWebClient webClient;

private int nextCodeIndex;
private List<Code> codes;

public CodeAndLabelRestItemReader(RestWebClient webClient) {
    this.webClient = webClient;     
    nextCodeIndex = 0;
}

@BeforeStep
public void beforeStep(final StepExecution stepExecution) {
    JobParameters jobParameters = stepExecution.getJobParameters();
    this.webClient.setEndPointSuffix(jobParameters.getString("endPointSuffix"));
}

@Override
public Code read()  {
    if(codesAndLabelsListNotInitialized()) {
        codes = webClient.getCodes();
    }

    Code nextCode = null;

    if (nextCodeIndex < codes.size()) {
        nextCode = codes.get(nextCodeIndex);
        nextCodeIndex++;
    }

    return nextCode;
}

private boolean codesAndLabelsListNotInitialized() {
    return this.codes == null;
}

}
处理器:

public class CodeAndLabelItemProcessor implements ItemProcessor<Code, CodeAndLabel> {

private String populationId;
private String populationDataProvider;
private String transactionId;

public CodeAndLabelItemProcessor(String populationId, String populationDataProvider, String transactionId) {
    this.populationId = populationId;
    this.populationDataProvider = populationDataProvider;
    this.transactionId = transactionId; 
}

@Override
public CodeAndLabel process(Code code) throws Exception {

    CodeAndLabel codeAndLabel = new CodeAndLabel();

    codeAndLabel.setUid(code.getUid());

    System.out.println("Converting (" + code + ") into (" + codeAndLabel + ")");

    return codeAndLabel;
}

}
作者:

public class CodeAndLabelItemWriter implements ItemWriter<CodeAndLabel>{

private AmqpTemplate template;

public CodeAndLabelItemWriter(AmqpTemplate template) {
    this.template = template;
}

@Override
public void write(List<? extends CodeAndLabel> items) throws Exception {

    if (log.isDebugEnabled()) {

        log.debug("Writing to RabbitMQ with " + items.size() + " items."); }

    for(CodeAndLabel item : items) {
        template.convertAndSend(BatchConfiguration.topicExchangeName,"com.batchprocessing.queue",item);
        System.out.println("item : "+item);
    }

}

}侦听器:

@Component
 public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

@Autowired
private JdbcTemplate jdbcTemplate;

@Override
public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
        System.out.println("JOB FINISHED");

    }
}

}
以及运行作业的类:

@Component
 public class Initialization {
 // some code here
   String[] languages = processLanguage.split(";");             
                for(String language : languages) {

                    JobParameters params = new JobParametersBuilder() 
                              .addString("JobID",String.valueOf(System.currentTimeMillis()))                        
                              .addString("endPointSuffix", 
     "/codeAndLabel".concat(language.toUpperCase()))
                              .toJobParameters();
                      jobLauncher.run(job, params);

                }

输出:对于第一次迭代:

Converting (WFR.SP.2C) into (WFR.SP.2C)
Converting (WFR.SP.3E) into (WFR.SP.3E)
Converting (WFR.SP.FC) into (WFR.SP.FC)
Converting (WFR.SP.FD) into (WFR.SP.FD)
Converting (WFR.SP.FI) into (WFR.SP.FI)
Converting (WFR.SP.FM) into (WFR.SP.FM)
item : WFR.SP.2C
item : WFR.SP.3E
item : WFR.SP.FC 
item : WFR.SP.FD
item : WFR.SP.FI
item : WFR.SP.FM
JOB FINISHED

第二次迭代

JOB FINISHED

我认为在第二次迭代中,作业没有运行reader处理器和writerbeans,我不知道为什么。有人能帮点忙吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题