早检测到asynclistitemwriter步骤结束

zour9fqk  于 2021-07-18  发布在  Java
关注(0)|答案(1)|浏览(403)

我已经写了一个异步版本的 ItemWriter 要异步写入项目,请执行以下操作:

public class AsyncListItemWriter<T> implements ItemStreamWriter<T>, InitializingBean {

    private ItemWriter<T> delegate;

    private TaskExecutor taskExecutor = new SyncTaskExecutor();

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(delegate, "A delegate ItemWriter must be provided.");
    }

    public void setDelegate(ItemWriter<T> delegate) {
        this.delegate = delegate;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).open(executionContext);
        }
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).update(executionContext);
        }
    }

    @Override
    public void close() throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).close();
        }
    }

    @Override
    public void write(List<? extends T> items)  {
        StepExecution stepExecution = getStepExecution();
        taskExecutor.execute(() -> {
            if (stepExecution != null) {
                StepSynchronizationManager.register(stepExecution);
            }
            try {
                delegate.write(items);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (stepExecution != null) {
                    StepSynchronizationManager.close();
                }
            }
        });
    }

    private StepExecution getStepExecution() {
        StepContext context = StepSynchronizationManager.getContext();
        if (context == null) {
            return null;
        }
        StepExecution stepExecution = context.getStepExecution();
        return stepExecution;
    }
}

配置:

@Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(64);
        executor.setMaxPoolSize(64);
        executor.setQueueCapacity(64);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("MultiThreaded-");
        return executor;
    }

    @Bean
    public ItemWriter<STModel> writer(){
        return items -> {
            Thread.sleep(1000);
            System.out.println("Writing...");
            for(STModel c : items) {
                System.out.println("######### Writer : ------> " + c + " inside size : " + c.relation.size() + ", On : " +Thread.currentThread().getName());
            }
        };
    }

    @Bean
    public AsyncListItemWriter<STModel> asyncWriter() throws Exception {
        AsyncListItemWriter<STModel> asyncItemWriter = new AsyncListItemWriter<>();
        asyncItemWriter.setDelegate(writer());
        asyncItemWriter.setTaskExecutor(taskExecutor());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

    @Bean
    public Step sampleStep() throws Exception{
        return stepBuilderFactory.get("processingStep")
                .<STModel, STModel>chunk(10)
                .reader(itpReader())
                .writer(asyncWriter())
                .build();
    }

    @Bean
    public Job job() throws Exception{
        return jobBuilderFactory.get("job")
                .start(sampleStep())
                .build();
    }

读取所有文件后,我将收到以下日志(在主线程上读取该文件):
同时读取---->stmodel(tripid=109138356-1459178)
同时读取---->stmodel(tripid=109138356-1459178)
同时读取---->null
读取---->null,开:main
2021-03-16 10:38:48.409信息17042---[main]o.s.batch.core.step.abstractstep:步骤:[processingstep]在337ms内执行
2021-03-16 10:38:48.412信息17042---[main]o.s.b.c.l.support.simplejoblauncher:作业:[simplejob:[name=job]]已完成,使用以下参数:[{}]和以下状态:[completed]在349ms内完成
正在写入。。。

######### writer:------>stmodel(tripid=109138355-1459164)内部大小:2,开:多线程-1

######### writer:------>stmodel(tripid=109138355-1459165)内部大小:1,开:多线程-1

######### writer:------>stmodel(tripid=109138355-1459166)内部大小:1,开:多线程-1

######### writer:------>stmodel(tripid=109138355-1459167)内部大小:1,开:多线程-1

######### writer:------>stmodel(tripid=109138355-1459168)内部大小:2,开:多线程-1

######### writer:------>stmodel(tripid=113507833-138959)内部大小:23,开:多线程-1

######### writer:------>stmodel(tripid=113507835-138960)内部大小:23,开:多线程-1

######### writer:------>stmodel(tripid=113507852-138961)内部大小:23,开:多线程-1

######### writer:------>stmodel(tripid=113507863-138962)内部大小:23,开:多线程-1

######### writer:------>stmodel(tripid=113507871-138963)内部大小:23,开:多线程-1

正在写入。。。

######### writer:------>stmodel(tripid=113507882-138964)内部大小:23,开:多线程-2

######### writer:------>stmodel(tripid=113507890-138965)内部大小:23,开:多线程-2

######### writer:------>stmodel(tripid=113507900-138966)内部大小:23,开:多线程-2

######### writer:------>stmodel(tripid=113507911-138967)内部大小:23,开:多线程-2

如您所见,spring批处理过早地检测到steap的结束,就在读取操作之后。
我怎样才能告诉springbatch步骤的结束是在所有的写作任务完成之后?

2mbi3lxu

2mbi3lxu1#

如你所见,spring批处理过早地检测到steap的结束,
这是您应该期待的,因为您的编写器在后台异步编写项目,而不必等待它们完成。一旦作者 write 方法返回时,该步骤将继续读取下一个项目块,如果没有其他项目可供读取,则该步骤可能会完成。这可能发生在异步任务执行器在后台写入上一个块之前。
我怎样才能告诉springbatch步骤的结束是在所有的写作任务完成之后?
在你的写作中,你需要 submit 任务(而不是 execute 他们),得到他们的控制 Future 然后等他们完成 Future.get (如果需要,可以超时)。
编辑:添加示例

@Override
public void write(List<? extends T> items) {
    List<Future<?>> futures = new ArrayList<>(items.size());
    // write items in parallel (note the singleton list passed to the delegate)
    for (T item : items) { 
        Future<?> future = taskExecutor.submit(() -> {
            try {
                delegate.write(Collections.singletonList(item));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        futures.add(future);
    }
    // wait for futures to finish
    futures.forEach(future -> {
        try {
            future.get(10, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });
}

我在这里看到的唯一优点是,项目将并行编写。这可以加快你的每一个项目的电子邮件发送用例的事情。不过,我从你的评论中看到了以下几点:
写作与阅读并行,因此它加快了处理速度!
这是不正确的,读取下一个块仍将等待当前写入操作完成。因此,如上所述,您需要等待writer中的写入操作完成,否则您的作业可能看起来已经完成,而项目仍在后台写入。
如果确实希望多线程同时执行读写操作,则需要使用多线程步骤:

@Bean
public Step sampleStep() throws Exception{
    return stepBuilderFactory.get("processingStep")
                .<STModel, STModel>chunk(10)
                .reader(itpReader())
                .writer(writer())
                .taskExecutor(taskExecutor())
                .build();
}

另一种选择是使用并发步骤,如本期所述:https://github.com/spring-projects/spring-batch/issues/2044. 我有一个使用 BlockingQueue 如果你有兴趣的话,可以在这里作为中转站。

相关问题