csv Spring Batch避免在tasklet之前启动Reader和Writer

iih3973s  于 2023-05-20  发布在  Spring
关注(0)|答案(3)|浏览(186)

我正在使用spring batch,有一个两步的作业,第一步(tasklet)验证头CSV,第二步读取CSV文件并写入另一个CSV文件,如下所示:

@Bean
public ClassifierCompositeItemWriter<POJO> classifierCompositeItemWriter() throws Exception {
    Classifier<POJO, ItemWriter<? super POJO>> classifier = new ClassiItemWriter(ClassiItemWriter.itemWriter());
    return new ClassifierCompositeItemWriterBuilder<POJO>()
            .classifier(classifier)
            .build();
}


@Bean
public Step readAndWriteCsvFile() throws Exception {
    return stepBuilderFactory.get("readAndWriteCsvFile")
            .<POJO, POJO>chunk(10000)
            .reader(ClassitemReader.itemReader())
            .processor(processor())
            .writer(classifierCompositeItemWriter())
            .build();
}

在阅读CSV之前,我使用了FlatFileItemReader(在ClassitemReader中)和FlatFileItemWriter(在ClassiItemWriter中)。我通过tasklet检查CSV文件的头是否正确,如下所示:

@Bean
public Step fileValidatorStep() {
    return stepBuilderFactory
            .get("fileValidatorStep")
            .tasklet(fileValidator)
            .build();
}

如果是这样,我处理从CSV文件接收到另一个文件CSV的转换。
jobBuilderFactory i中检查ExistStatus是否来自tasklet fileValidatorStep是“COMPLETED”以将进程转发到readAndWriteCsvFile(),如果不是“COMPLETED”并且tasklet fileValidatorStep返回ExistStatus“ERROR”则作业结束并退出处理。

@Bean
public Job job() throws Exception {
    return jobBuilderFactory.get("job")
            .incrementer(new RunIdIncrementer())
            .start(fileValidatorStep()).on("ERROR").end()
            .next(fileValidatorStep()).on("COMPLETED").to(readAndWriteCsvFile())
            .end().build();
}

问题是,当我启动我的作业时,Bean readAndWriteCsvFile首先运行tasklet,这意味着在我可以验证头部并检查ExistStatus之前,spring batch的读取器和写入器的标准Bean总是在生命周期中加载,读取器仍在工作并读取文件并将数据放入另一个文件而不进行检查,因为在所有tasklet之前启动作业期间加载Bean。
如何在fileValidatorStep之后启动readAndWriteCsvFile方法?

wgxvkvu9

wgxvkvu91#

你不需要一个流动的工作,一个简单的工作就足够了。下面是一个简单的例子:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {

    @Bean
    public Step validationStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("validationStep")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        if(!isValid()) {
                            throw new Exception("Invalid file");
                        }
                        return RepeatStatus.FINISHED;
                    }

                    private boolean isValid() {
                        // TODO implement validation logic
                        return false;
                    }
                })
                .build();
    }

    @Bean
    public Step readAndWriteCsvFile(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("readAndWriteCsvFile")
                .<Integer, Integer>chunk(2)
                .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
                .writer(items -> items.forEach(System.out::println))
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        return jobBuilderFactory.get("job")
                .start(validationStep(stepBuilderFactory))
                .next(readAndWriteCsvFile(stepBuilderFactory))
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

在本例中,如果validationStep失败,则不会执行下一步。

avwztpqn

avwztpqn2#

我解决了我的问题,我用注解@StepScope改变了Job类配置中的bean FlatFileItemReader方法,现在这个bean只有在我需要它的时候才有负载,也应该避免将FlatFileItemReader bean声明到作业的范围之外

rkttyhzu

rkttyhzu3#

我也遇到过类似的问题。我有2个步骤与2个不同的readread总是比process先运行。然后我在两个read中都使用了@Step注解,它工作了。

相关问题