java—使用tasklet或compositeitemwriter在spring批处理中创建逻辑?

mec1mxoz  于 2021-07-23  发布在  Java
关注(0)|答案(1)|浏览(266)

我已经创建了3个文件,创建了3个本地文件。但是读取的所有信息都以相同的方式进入每个文件。我试图设置一些逻辑,以便只有被调用的项才能进入它们的正确文件。例如:创建filea、fileb、filec。当读者点击mongodb时,它应该解析buisness列并找到3种不同的类型:typea、typeb和typec。一旦读取器找到了不同的类型,它应该将其添加到相应的文件中,而不是将所有内容打印到每个创建的文件中。filea应该只有typea的信息,除此之外不应该有任何其他信息。有办法吗?下面是我的代码。
项目阅读器:

@Bean
  public MongoItemReader<PaymentAudit> mongoreader() {
    LOG.debug("Mongo-Reader");    
    @SuppressWarnings("unchecked")
    MongoItemReader<PaymentAudit> mongoreader = new MongoItemReader();
    mongoreader.setTemplate(mongoTemplate);
    mongoreader.setQuery("{}");
    mongoreader.setTargetType(PaymentAudit.class);
    mongoreader.setSort(new HashMap<String, Sort.Direction>() {
        {
            put("_id", Direction.ASC);
        }
    });
    return mongoreader;
  }

文件项编写器:

@StepScope
@Bean
public FlatFileItemWriter<PaymentAudit> writer() {
    LOG.debug("Mongo-writer");  
    String exportFilePath="C:\\filewriter\\retail.txt";
    FlatFileItemWriter<PaymentAudit> flatFile = new 
 FlatFileItemWriterBuilder<PaymentAudit>()
            .name("")
            .resource(new FileSystemResource(exportFilePath))
            .lineAggregator(createPaymentPortalLineAggregator())
            .build();
    String exportFileHeader = "TypeA";
    StringHeaderWriter headerWriter = new 
  StringHeaderWriter(exportFileHeader);
    flatFile.setHeaderCallback(headerWriter);
    return flatFile;

}
@Bean
public FlatFileItemWriter<PaymentAudit> writer2() {
    LOG.debug("flatFileItemWriter");    
    String exportFilePath="C:\\filewriter\\hcc.txt";
    FlatFileItemWriter<PaymentAudit> flatFile = new 
  FlatFileItemWriterBuilder<PaymentAudit>()
            .name("")
            .resource(new FileSystemResource(exportFilePath))
            .lineAggregator(createPaymentPortalLineAggregator())
            .build();
    String exportFileHeader = "TypeB";
    StringHeaderWriter headerWriter = new 
    StringHeaderWriter(exportFileHeader);
    flatFile.setHeaderCallback(headerWriter);
    return flatFile;

}
@Bean
public FlatFileItemWriter<PaymentAudit> writer3() {
    LOG.debug("Mongo-writer");  
    String exportFilePath="C:\\filewriter\\srx.txt";
    FlatFileItemWriter<PaymentAudit> flatFile = new 
    FlatFileItemWriterBuilder<PaymentAudit>()
            .name("")
            .resource(new FileSystemResource(exportFilePath))
            .lineAggregator(createPaymentPortalLineAggregator())
            .build();
    String exportFileHeader = "TypeC";
    StringHeaderWriter headerWriter = new 
    StringHeaderWriter(exportFileHeader);
    flatFile.setHeaderCallback(headerWriter);
    return flatFile;

}

@SuppressWarnings({ "unchecked", "rawtypes" })
public CompositeItemWriter<PaymentAudit> compositeItemWriter(){
    CompositeItemWriter writer = new CompositeItemWriter();

  private LineAggregator<PaymentAudit> createPaymentPortalLineAggregator() {
    DelimitedLineAggregator<PaymentAudit> lineAggregator = new 
    DelimitedLineAggregator<>();
    lineAggregator.setDelimiter("|");
    FieldExtractor<PaymentAudit> fieldExtractor = 
    createPaymentPortalFieldExtractor();
    lineAggregator.setFieldExtractor(fieldExtractor);
    return lineAggregator;
}

private FieldExtractor<PaymentAudit> createPaymentPortalFieldExtractor() {
    BeanWrapperFieldExtractor<PaymentAudit> extractor = new 
BeanWrapperFieldExtractor<>();
    extractor.setNames(new String[] { "TypeA, TypeB, TypeC"});      
    return extractor;
}
bf1o4zei

bf1o4zei1#

你需要使用 ClassifierCompositeItemWriter 以便对项目进行分类并将每种类型写入相应的文件中。下面是一个您可以尝试的快速示例:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.classify.Classifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

@Configuration
@EnableBatchProcessing
public class MyJob {

    private JobBuilderFactory jobBuilderFactory;

    private StepBuilderFactory stepBuilderFactory;

    public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public ItemReader<Person> itemReader() {
        Person foo1 = new Person();foo1.setId(1);foo1.setName("foo1");
        Person foo2 = new Person();foo2.setId(2);foo2.setName("foo2");
        Person bar1 = new Person();bar1.setId(3);bar1.setName("bar1");
        Person bar2 = new Person();bar2.setId(4);bar2.setName("bar2");
        return new ListItemReader<>(Arrays.asList(foo1, foo2, bar1, bar2));
    }

    @Bean
    public ClassifierCompositeItemWriter<Person> classifierCompositeItemWriter(ItemWriter<Person> fooItemWriter, ItemWriter<Person> barItemWriter) {
        ClassifierCompositeItemWriter<Person> classifierCompositeItemWriter = new ClassifierCompositeItemWriter<>();
        classifierCompositeItemWriter.setClassifier((Classifier<Person, ItemWriter<? super Person>>) person -> {
            if (person.getName().startsWith("foo")) {
                return fooItemWriter;
            } else {
                return barItemWriter;
            }
        });
        return classifierCompositeItemWriter;
    }

    @Bean
    public FlatFileItemWriter<Person> fooItemWriter() {
        return new FlatFileItemWriterBuilder<Person>()
                .name("fooItemWriter")
                .resource(new FileSystemResource("foos.txt"))
                .lineAggregator(new PassThroughLineAggregator<>())
                .build();
    }

    @Bean
    public FlatFileItemWriter<Person> barItemWriter() {
        return new FlatFileItemWriterBuilder<Person>()
                .name("barItemWriter")
                .resource(new FileSystemResource("bars.txt"))
                .lineAggregator(new PassThroughLineAggregator<>())
                .build();
    }

    @Bean
    public Step dataExtractionStep() {
        return stepBuilderFactory.get("dataExtractionStep")
                .<Person, Person>chunk(2)
                .reader(itemReader())
                .writer(classifierCompositeItemWriter(fooItemWriter(), barItemWriter()))
                .stream(fooItemWriter())
                .stream(barItemWriter())
                .build();
    }

    @Bean
    public Job dataExtractionJob() {
        return jobBuilderFactory.get("dataExtractionJob")
                .start(dataExtractionStep())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    public static class Person {

        private int id;

        private String name;

        public Person() {
        }

        public Person(int id, String name) {
            this.id = id;
            this.name = name;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

}

此示例读取一些 Person 项目,并用名称 foo*foos.txt 还有那些有名字的 bar*bars.txt .
希望这有帮助。

相关问题