spring批处理itemwritelistener和skiplistener不能协同工作

cxfofazt  于 2021-07-24  发布在  Java
关注(0)|答案(2)|浏览(459)

我在做一个关于 Spring 的poc SkipListener 以及 ItemWriteListener 因为我的生意需要它,我只需要这样。 Spring 批次 ItemWriteListener 以及 SkipListener 在我看来,这是行不通的。
这是我开发的一些poc代码,但是 @BeforeWrite 不会将任何内容保存到数据库中。
myskiplistener.java文件

public class MySkipListener implements SkipListener<Integer, Integer> {

    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println("@@@MySkipListener| On Skip in Read Error : " + t.getMessage());
    }

    @Override
    public void onSkipInWrite(Integer item, Throwable t) {
        System.out.println("@@@MySkipListener | Skipped in write due to : " + t.getMessage());
    }

    @Override
    public void onSkipInProcess(Integer item, Throwable t) {
        System.out.println("@@@MySkipListener | Skipped in process due to: " + t.getMessage());
    }
}

mysteplistener.java文件

public class MyStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("beforeStep");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("afterStep");
        return ExitStatus.COMPLETED;
    }
}

myitemwritelistener.java文件

public class MyItemWriteListener {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @BeforeWrite
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void beforeWrite(List<? extends Integer> items) {
        System.out.println("########### ItemWriteListener | beforeWrite " + items);

        // MUST FOR ME to save data into DB here !!!
        jdbcTemplate.update("INSERT INTO `test`.`mytest`(`name`) VALUES( 'A')");
    }
}

我的工作.java

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return items -> {
            for (Integer item : items) {
                if (item.equals(1)) {
                    throw new Exception("No 1 here!");
                }
                System.out.println("item = " + item);
            }
        };
    }

    @Bean
    public Step step() {
        return steps.get("step")
                .<Integer, Integer>chunk(5)
                .reader(itemReader())
                .writer(itemWriter())
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(10)
                .listener(mySkipListener())
                .listener(myStepListener())
                .listener(myItemWriteListener())
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step())
                .build();
    }

    @Bean
    public MySkipListener mySkipListener() {
        return new MySkipListener();
    }

    @Bean
    public MyStepListener myStepListener() {
        return new MyStepListener();
    }

    @Bean
    public MyItemWriteListener myItemWriteListener() {
        return new MyItemWriteListener();
    }
}
zqry0prt

zqry0prt1#

因为当你使用 @BeforeWrite 为了声明侦听器方法,它将在 Package 此方法的场景后面创建一个springaop代理,但此代理不是springbean,因此它不知道如何对 @Transactional 因此它没有效果。
试着用 ItemWriteListener :

@Component
public class MyItemWriteListener implements ItemWriteListener<Integer> {

    void beforeWrite(List<Integer> items){

    }
}

另一方面,我不会为需要批处理的项(即对象读取)控制与事务相关的内容 ItemReader )因为我自己 spring-batch 已经帮助他们管理事务。每当处理一个新的项目块时,它就已经开始了一个新的事务。当它重试或跳过一个失败项时,自己管理它会把与回滚相关的事情搞砸。

lh80um4z

lh80um4z2#

我开发了poc代码,但是@beforewrite没有将任何内容保存到db中。有什么快速的帮助吗?
原因在itemwritelistener的javadoc中有解释:

in fault tolerant scenarios, any transactional work that is done in one of these methods
would be rolled back and not re-applied. Because of this, it is recommended to not perform
any logic using this listener that participates in a transaction.

来自的所有方法 ItemWriterListener 以及 SkipListener 在spring批处理驱动的事务范围内执行。 ChunkListener 我想这就是你要找的。 ChunkListener#beforeChunk 在事务内部调用 ChunkListener#afterChunk 以及 ChunkListener#afterChunkError 在事务外部调用。这是为了在需要时允许在自己的事务中进行补偿操作。在这种情况下,由您来管理这个单独事务的生命周期。以下是javadoc的摘录:

data access code within this callback will still "participate" in the original transaction

**unless it declares that it runs in its own transaction**.

Hence: Use PROPAGATION_REQUIRES_NEW for any transactional operation that is called from here.

相关问题