Spring 批次:在读写器之间传递数据

sbdsn5lh  于 2023-04-19  发布在  Spring
关注(0)|答案(3)|浏览(121)

我想在Writer中获取我在步骤的Reader中设置的数据。我通过www.example.com了解ExecutionContexts(步骤和作业)和ExecutionContextPromotionListenerhttp://docs.spring.io/spring-batch/trunk/reference/html/patterns.html#passingDataToFutureSteps
问题是,在Writer中,我正在检索'npag'的空值。
ItemWriter上的行:

LOG.info("INSIDE WRITE, NPAG: " + nPag);

我一直在做一些工作区没有运气,寻找其他similar questions的答案...任何帮助?谢谢!
下面是我的代码:

阅读器

@Component
public class LCItemReader implements ItemReader<String> {

private StepExecution stepExecution;

private int nPag = 1;

@Override
public String read() throws CustomItemReaderException {

    ExecutionContext stepContext = this.stepExecution.getExecutionContext();
    stepContext.put("npag", nPag);
    nPag++;
    return "content";
}

@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}
}

作者

@Component
@StepScope
public class LCItemWriter implements ItemWriter<String> {

private String nPag;

@Override
public void write(List<? extends String> continguts) throws Exception {
    try {
        LOG.info("INSIDE WRITE, NPAG: " + nPag);
    } catch (Throwable ex) {
        LOG.error("Error: " + ex.getMessage());
    }
}

@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
    JobExecution jobExecution = stepExecution.getJobExecution();
    ExecutionContext jobContext = jobExecution.getExecutionContext();
    this.nPag = jobContext.get("npag").toString();
}
}

作业/步骤批处理配置

@Bean
public Job lCJob() {
    return jobs.get("lCJob")
            .listener(jobListener)
            .start(lCStep())
            .build();
}

@Bean
public Step lCStep() {
    return steps.get("lCStep")
            .<String, String>chunk(1)
            .reader(lCItemReader)
            .processor(lCProcessor)
            .writer(lCItemWriter)
            .listener(promotionListener())
            .build();
}

听众

@Bean
public ExecutionContextPromotionListener promotionListener() {
    ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
    executionContextPromotionListener.setKeys(new String[]{"npag"});
    return executionContextPromotionListener;
}
ccrfmcuu

ccrfmcuu1#

ExecutionContextPromotionListener特别声明它在一个步骤的末尾工作,所以这将是在writer执行之后。所以我认为你所指望的提升并没有在你认为它会发生的时候发生。
如果我是你,我会在步骤上下文中设置它,如果你需要一个步骤中的值,我会从步骤中获取它。
另一个方面是@BeforeStep。它标记了一个在步骤上下文存在之前执行的方法。在读取器中设置nPag值的方式是在步骤开始执行之后。

ppcbkaq5

ppcbkaq52#

在读取器中设置nPag之前,您就试图读取nPag的值,结果得到的默认值为null。您需要在记录时直接从执行上下文读取nPag上的值。您可以保留对jobContext的引用。请尝试以下操作

@Component
@StepScope
public class LCItemWriter implements ItemWriter<String> {

private String nPag;
private ExecutionContext jobContext;

@Override
public void write(List<? extends String> continguts) throws Exception {
    try {
        this.nPag = jobContext.get("npag").toString();
        LOG.info("INSIDE WRITE, NPAG: " + nPag);
    } catch (Throwable ex) {
        LOG.error("Error: " + ex.getMessage());
    }
}

@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
    JobExecution jobExecution = stepExecution.getJobExecution();
    jobContext = jobExecution.getExecutionContext();

}
}
jgwigjjp

jgwigjjp3#

在你的Reader和Writer中,你需要实现ItemStream接口,并使用ExecutionContext作为成员变量。这里我给出了一个使用Processor而不是Writer的例子,但同样也适用于Writer。它对我来说很好,我可以从读取器到处理器获取值。
我已经在reader中设置了上下文中的值,并在processor中获取了值。

public class EmployeeItemReader implements ItemReader<Employee>, ItemStream {

    ExecutionContext context;
    @Override
    public Employee read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        context.put("ajay", "i am going well");
        Employee emp=new Employee();
        emp.setEmpId(1);
        emp.setFirstName("ajay");
        emp.setLastName("goswami");
        return emp;
    }
    @Override
    public void close() throws ItemStreamException {
        // TODO Auto-generated method stub
    }

    @Override
    public void open(ExecutionContext arg0) throws ItemStreamException {
        context = arg0;
    }

    @Override
    public void update(ExecutionContext arg0) throws ItemStreamException {
        // TODO Auto-generated method stub
        context = arg0;
    }

}

我的处理器

public class CustomItemProcessor implements ItemProcessor<Employee,ActiveEmployee>,ItemStream{
    ExecutionContext context;
    @Override
    public ActiveEmployee process(Employee emp) throws Exception {
        //See this line
        System.out.println(context.get("ajay"));

        ActiveEmployee actEmp=new ActiveEmployee();
        actEmp.setEmpId(emp.getEmpId());
        actEmp.setFirstName(emp.getFirstName());
        actEmp.setLastName(emp.getLastName());
        actEmp.setAdditionalInfo("Employee is processed");
        return actEmp;
    }
    @Override
    public void close() throws ItemStreamException {
        // TODO Auto-generated method stub

    }
    @Override
    public void open(ExecutionContext arg0) throws ItemStreamException {
        // TODO Auto-generated method stub

    }
    @Override
    public void update(ExecutionContext arg0) throws ItemStreamException {
        context = arg0;

    }

}

希望这能帮上忙。

相关问题