如何知道哪个块在spring批处理中失败并分配计数器值?

oyjwcjzk  于 2021-07-23  发布在  Java
关注(0)|答案(1)|浏览(344)

我在这里扩展了这个问题-在spring批处理中确定哪个块在基于块的步骤中失败了。
你能给我看看下面的代码吗?
如何知道哪个块失败了?
如何制作一个计数器并将自动递增的值赋给一个不是pk的字段并保存到db?

vvppvyoh

vvppvyoh1#

在非容错步骤中,任何块中的第一个错误都将使该步骤失败,并且您可以使用内存中的计数器获取块编号 ChunkListener 如前所述的实施。下面是一个简单的例子:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ChunkListenerSupport;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Integer, Integer>chunk(5)
                        .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
                        .writer(items -> {
                            System.out.println("About to write items " + items);
                            if (items.contains(8)) {
                                throw new Exception("No 8 here!");
                            }
                        })
                        .listener(new MyChunkListener())
                        .build())
                .build();
    }

    static class MyChunkListener extends ChunkListenerSupport {
        private int counter;

        @Override
        public void beforeChunk(ChunkContext context) {
            counter++;
        }

        @Override
        public void afterChunkError(ChunkContext context) {
            System.out.println("Chunk number " + counter + " failed");
        }
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

这张照片:

About to write items [1, 2, 3, 4, 5]
About to write items [6, 7, 8, 9, 10]
Chunk number 2 failed

但是,在容错步骤中,项将逐个重试,springbatch将创建单个项块。在这种情况下 ChunkListener 将为这些单个项块中的每一个调用,因此应该正确解释计数器。以下是前一示例的容错版本:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ChunkListenerSupport;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Integer, Integer>chunk(5)
                        .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(10)
                        .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
                        .writer(items -> {
                            System.out.println("About to write items " + items);
                            if (items.contains(8)) {
                                throw new Exception("No 8 here!");
                            }
                        })
                        .listener(new MyChunkListener())
                        .build())
                .build();
    }

    static class MyChunkListener extends ChunkListenerSupport {
        private int counter;

        @Override
        public void beforeChunk(ChunkContext context) {
            counter++;
        }

        @Override
        public void afterChunkError(ChunkContext context) {
            System.out.println("Chunk number " + counter + " failed");
        }
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

打印内容:

About to write items [1, 2, 3, 4, 5]
About to write items [6, 7, 8, 9, 10]
Chunk number 2 failed
About to write items [6]
About to write items [7]
About to write items [8]
Chunk number 5 failed
About to write items [9]
About to write items [10]

相关问题