批处理作业完成后,我们能否在Spring Batch中处理数据?

yb3bgrhw  于 2022-12-10  发布在  Spring
关注(0)|答案(1)|浏览(139)

The bounty expires tomorrow. Answers to this question are eligible for a +50 reputation bounty. Sanjay Naik wants to draw more attention to this question.

I am using spring batch for reading data from db and process the same and do spome process in writer.
if batch size is less than the records read by reader then spring batch runs in multiple batches.I want to do the processing in writer only once at the end of all batch process completion or if this is not possible then i will remove writer and process the data obtained in processor after batch job is completed.Is this possible?
Below is my trigger Spring Batch job code

private void triggerSpringBatchJob() {
        loggerConfig.logDebug(log, " : Triggering product catalog scheduler ");
        
        JobParametersBuilder builder = new JobParametersBuilder();

        try {

            // Adding date in buildJobParameters because if not added we will get A job
            // instance already exists: JobInstanceAlreadyCompleteException
            builder.addDate("date", new Date());
            jobLauncher.run(processProductCatalog, builder.toJobParameters());

        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException e) {

            e.printStackTrace();

        }
    }

Below is my spring batch configuration

@Configuration
@EnableBatchProcessing
public class BatchJobProcessConfiguration {
    

    
    @Bean
    @StepScope
    RepositoryItemReader<Tuple> reader(SkuRepository skuRepository,
            ProductCatalogConfiguration productCatalogConfiguration) {

        RepositoryItemReader<Tuple> reader = new RepositoryItemReader<>();
        reader.setRepository(skuRepository);
        // query parameters
        List<Object> queryMethodArguments = new ArrayList<>();
        
        
        if (productCatalogConfiguration.getSkuId().isEmpty()) {
            reader.setMethodName("findByWebEligibleAndDiscontinued");
            queryMethodArguments.add(productCatalogConfiguration.getWebEligible()); // for web eligible
            queryMethodArguments.add(productCatalogConfiguration.getDiscontinued()); // for discontinued
            queryMethodArguments.add(productCatalogConfiguration.getCbdProductId()); // for cbd products
        } else {
            reader.setMethodName("findBySkuIds");
            queryMethodArguments.add(productCatalogConfiguration.getSkuId()); // for sku ids
        }

        reader.setArguments(queryMethodArguments);

        reader.setPageSize(1000);
        Map<String, Direction> sorts = new HashMap<>();
        sorts.put("sku_id", Direction.ASC);
        reader.setSort(sorts);

        return reader;
    }

    @Bean
    @StepScope
    ItemWriter<ProductCatalogWriterData> writer() {
        return new ProductCatalogWriter();
    }

    @Bean
    ProductCatalogProcessor processor() {
        return new ProductCatalogProcessor();
    }
    
    @Bean
     SkipPolicy readerSkipper() {
        return new ReaderSkipper();

    @Bean
    Step productCatalogDataStep(ItemReader<Tuple> itemReader, ProductCatalogWriter writer,
            HttpServletRequest request, StepBuilderFactory stepBuilderFactory,BatchConfiguration batchConfiguration) {
        return stepBuilderFactory.get("processProductCatalog").<Tuple, ProductCatalogWriterData>chunk(batchConfiguration.getBatchChunkSize())
                .reader(itemReader).faultTolerant().skipPolicy(readerSkipper()).processor(processor()).writer(writer).build();
    }

    
    @Bean
    Job productCatalogData(Step productCatalogDataStep, HttpServletRequest request,
            JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("processProductCatalog").incrementer(new RunIdIncrementer())
                .flow(productCatalogDataStep).end().build();
    }

}
yeotifhr

yeotifhr1#

我想在所有批处理完成后在writer中只进行一次处理,或者如果不可能,我将删除writer,并在批处理作业完成后在processor中处理获得的数据。这可能吗?
“在所有批处理完成的最后”是这里的关键。如果需求是在所有块都已经“预处理”之后做一些处理,我会保持它的简单,并使用两个步骤:

  • 步骤1:根据需要(预)处理数据并将其写入临时存储
  • 第2步:在这里,您可以对临时存储器中准备的已处理数据进行任何处理

最后一步是清理临时存储,如果它是持久的(文件,临时表等)。否则,即如果它是在内存中,这是可选的。

相关问题