我已经写了一个异步版本的 ItemWriter
要异步写入项目,请执行以下操作:
public class AsyncListItemWriter<T> implements ItemStreamWriter<T>, InitializingBean {
private ItemWriter<T> delegate;
private TaskExecutor taskExecutor = new SyncTaskExecutor();
public void afterPropertiesSet() throws Exception {
Assert.notNull(delegate, "A delegate ItemWriter must be provided.");
}
public void setDelegate(ItemWriter<T> delegate) {
this.delegate = delegate;
}
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).open(executionContext);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).update(executionContext);
}
}
@Override
public void close() throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).close();
}
}
@Override
public void write(List<? extends T> items) {
StepExecution stepExecution = getStepExecution();
taskExecutor.execute(() -> {
if (stepExecution != null) {
StepSynchronizationManager.register(stepExecution);
}
try {
delegate.write(items);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (stepExecution != null) {
StepSynchronizationManager.close();
}
}
});
}
private StepExecution getStepExecution() {
StepContext context = StepSynchronizationManager.getContext();
if (context == null) {
return null;
}
StepExecution stepExecution = context.getStepExecution();
return stepExecution;
}
}
配置:
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(64);
executor.setMaxPoolSize(64);
executor.setQueueCapacity(64);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-");
return executor;
}
@Bean
public ItemWriter<STModel> writer(){
return items -> {
Thread.sleep(1000);
System.out.println("Writing...");
for(STModel c : items) {
System.out.println("######### Writer : ------> " + c + " inside size : " + c.relation.size() + ", On : " +Thread.currentThread().getName());
}
};
}
@Bean
public AsyncListItemWriter<STModel> asyncWriter() throws Exception {
AsyncListItemWriter<STModel> asyncItemWriter = new AsyncListItemWriter<>();
asyncItemWriter.setDelegate(writer());
asyncItemWriter.setTaskExecutor(taskExecutor());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
@Bean
public Step sampleStep() throws Exception{
return stepBuilderFactory.get("processingStep")
.<STModel, STModel>chunk(10)
.reader(itpReader())
.writer(asyncWriter())
.build();
}
@Bean
public Job job() throws Exception{
return jobBuilderFactory.get("job")
.start(sampleStep())
.build();
}
读取所有文件后,我将收到以下日志(在主线程上读取该文件):
同时读取---->stmodel(tripid=109138356-1459178)
同时读取---->stmodel(tripid=109138356-1459178)
同时读取---->null
读取---->null,开:main
2021-03-16 10:38:48.409信息17042---[main]o.s.batch.core.step.abstractstep:步骤:[processingstep]在337ms内执行
2021-03-16 10:38:48.412信息17042---[main]o.s.b.c.l.support.simplejoblauncher:作业:[simplejob:[name=job]]已完成,使用以下参数:[{}]和以下状态:[completed]在349ms内完成
正在写入。。。
######### writer:------>stmodel(tripid=109138355-1459164)内部大小:2,开:多线程-1
######### writer:------>stmodel(tripid=109138355-1459165)内部大小:1,开:多线程-1
######### writer:------>stmodel(tripid=109138355-1459166)内部大小:1,开:多线程-1
######### writer:------>stmodel(tripid=109138355-1459167)内部大小:1,开:多线程-1
######### writer:------>stmodel(tripid=109138355-1459168)内部大小:2,开:多线程-1
######### writer:------>stmodel(tripid=113507833-138959)内部大小:23,开:多线程-1
######### writer:------>stmodel(tripid=113507835-138960)内部大小:23,开:多线程-1
######### writer:------>stmodel(tripid=113507852-138961)内部大小:23,开:多线程-1
######### writer:------>stmodel(tripid=113507863-138962)内部大小:23,开:多线程-1
######### writer:------>stmodel(tripid=113507871-138963)内部大小:23,开:多线程-1
正在写入。。。
######### writer:------>stmodel(tripid=113507882-138964)内部大小:23,开:多线程-2
######### writer:------>stmodel(tripid=113507890-138965)内部大小:23,开:多线程-2
######### writer:------>stmodel(tripid=113507900-138966)内部大小:23,开:多线程-2
######### writer:------>stmodel(tripid=113507911-138967)内部大小:23,开:多线程-2
如您所见,spring批处理过早地检测到steap的结束,就在读取操作之后。
我怎样才能告诉springbatch步骤的结束是在所有的写作任务完成之后?
1条答案
按热度按时间2mbi3lxu1#
如你所见,spring批处理过早地检测到steap的结束,
这是您应该期待的,因为您的编写器在后台异步编写项目,而不必等待它们完成。一旦作者
write
方法返回时,该步骤将继续读取下一个项目块,如果没有其他项目可供读取,则该步骤可能会完成。这可能发生在异步任务执行器在后台写入上一个块之前。我怎样才能告诉springbatch步骤的结束是在所有的写作任务完成之后?
在你的写作中,你需要
submit
任务(而不是execute
他们),得到他们的控制Future
然后等他们完成Future.get
(如果需要,可以超时)。编辑:添加示例
我在这里看到的唯一优点是,项目将并行编写。这可以加快你的每一个项目的电子邮件发送用例的事情。不过,我从你的评论中看到了以下几点:
写作与阅读并行,因此它加快了处理速度!
这是不正确的,读取下一个块仍将等待当前写入操作完成。因此,如上所述,您需要等待writer中的写入操作完成,否则您的作业可能看起来已经完成,而项目仍在后台写入。
如果确实希望多线程同时执行读写操作,则需要使用多线程步骤:
另一种选择是使用并发步骤,如本期所述:https://github.com/spring-projects/spring-batch/issues/2044. 我有一个使用
BlockingQueue
如果你有兴趣的话,可以在这里作为中转站。