我创建了一个spring批处理作业,它从mongodb读取订单,并发出rest调用来上传它们。但是,即使mongoitemreader没有读取所有记录,批处理作业也会自动完成。
我正在维护一个现场批次processed:boolean on 订单收集。mongoitemreader读取{batchprocessed:{$ne:true}}的记录,因为我需要多次运行批处理作业,而不是一次又一次地处理相同的文档。
在orderwriter中,我将batchprocessed设置为true。
@Bean
@StepScope
public MongoItemReader<Order> orderReader() {
MongoItemReader<Order> reader = new MongoItemReader<>();
reader.setTemplate(mongoTempate);
HashMap<String,Sort.Direction> sortMap = new HashMap<>();
sortMap.put("_id",Direction.ASC);
reader.setSort(sortMap);
reader.setTargetType(Order.class);
reader.setQuery("{batchProcessed:{$ne:true}}");
return reader;
}
@Bean
public Step uploadOrdersStep(OrderItemProcessor processor) {
return stepBuilderFactory.get("step1").<Order, Order>chunk(1)
.reader(orderReader()).processor(processor).writer(orderWriter).build();
}
@Bean
public Job orderUploadBatchJob(JobBuilderFactory factory, OrderItemProcessor processor) {
return factory.get("uploadOrder").flow(uploadOrdersStep(processor)).end().build();
}
1条答案
按热度按时间jvidinwx1#
这个
MongoItemReader
是分页项读取器。当读取页面中的项目并更改查询可能返回的项目(即在查询的“where”子句中使用的字段)时,分页逻辑可能会丢失,某些项目可能会被跳过。jpa分页项读取器也有一个类似的问题,这里将详细解释:spring批处理jpapagingitemreader为什么不读取某些行?解决此问题的常见技术是使用基于游标的读取器、使用暂存表/集合、使用每页有一个分区的分区步骤等。