我使用Spring批处理,元数据存储在mongo DB中,存储在以下集合中
JobInstance
JobExecution
StepExecution
ExecutionContext
Sequences
当我将Spring batch从版本3升级到4时,batch framework没有将元数据记录写入ExecutionContext collection。Job运行良好,并且它将元数据存储到剩余的collection中,没有任何问题。
下面是我们创建的MongoExecutionContextDao
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
import org.bson.Document;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.repository.dao.ExecutionContextDao;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Repository;
import org.springframework.util.Assert;
import org.springframework.util.NumberUtils;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collection;
import java.util.Map;
@Repository
public class MongoExecutionContextDao extends AbstractMongoDao implements ExecutionContextDao {
@PostConstruct
public void init() {
super.init();
getCollection().createIndex(
new Document().append(STEP_EXECUTION_ID_KEY, 1)
.append(JOB_EXECUTION_ID_KEY, 1)
);
}
@Override
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
return getExecutionContext(JOB_EXECUTION_ID_KEY, jobExecution.getId());
}
@Override
public ExecutionContext getExecutionContext(StepExecution stepExecution) {
return getExecutionContext(STEP_EXECUTION_ID_KEY, stepExecution.getId());
}
@Override
public void saveExecutionContext(JobExecution jobExecution) {
saveOrUpdateExecutionContext(JOB_EXECUTION_ID_KEY, jobExecution.getId(), jobExecution.getExecutionContext());
}
@Override
public void saveExecutionContext(StepExecution stepExecution) {
saveOrUpdateExecutionContext(STEP_EXECUTION_ID_KEY, stepExecution.getId(), stepExecution.getExecutionContext());
}
@Override
public void updateExecutionContext(JobExecution jobExecution) {
saveOrUpdateExecutionContext(JOB_EXECUTION_ID_KEY, jobExecution.getId(), jobExecution.getExecutionContext());
}
@Override
public void updateExecutionContext(StepExecution stepExecution) {
saveOrUpdateExecutionContext(STEP_EXECUTION_ID_KEY, stepExecution.getId(), stepExecution.getExecutionContext());
}
private void saveOrUpdateExecutionContext(String executionIdKey, Long executionId, ExecutionContext executionContext) {
Assert.notNull(executionId, "ExecutionId must not be null.");
Assert.notNull(executionContext, "The ExecutionContext must not be null.");
Document dbObject = new Document(executionIdKey, executionId);
for (Map.Entry<String, Object> entry : executionContext.entrySet()) {
Object value = entry.getValue();
String key = entry.getKey();
dbObject.put(key.replaceAll(DOT_STRING, DOT_ESCAPE_STRING), value);
if (value instanceof BigDecimal || value instanceof BigInteger) {
dbObject.put(key + TYPE_SUFFIX, value.getClass().getName());
}
}
Document updateDoc = new Document("$set",dbObject);
getCollection().updateOne(dbObject,updateDoc);
}
@SuppressWarnings({"unchecked"})
private ExecutionContext getExecutionContext(String executionIdKey, Long executionId) {
Assert.notNull(executionId, "ExecutionId must not be null.");
Document result = (Document) getCollection().find(new Document(executionIdKey, executionId)).first();
ExecutionContext executionContext = new ExecutionContext();
if (result != null) {
result.remove(executionIdKey);
removeSystemFields(result);
for (String key : result.keySet()) {
Object value = result.get(key);
String type = (String) result.get(key + TYPE_SUFFIX);
if (type != null && Number.class.isAssignableFrom(value.getClass())) {
try {
value = NumberUtils.convertNumberToTargetClass((Number) value, (Class<? extends Number>) Class.forName(type));
} catch (Exception e) {
logger.warn("Failed to convert {} to {}", key, type);
}
}
//Mongo db does not allow key name with "." character.
executionContext.put(
key.replaceAll(DOT_ESCAPE_STRING, DOT_STRING), value);
}
}
return executionContext;
}
@Override
protected MongoCollection getCollection() {
return mongoTemplate.getCollection(ExecutionContext.class.getSimpleName());
}
@Override
public void saveExecutionContexts(Collection<StepExecution> stepExecutions) {
Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions");
for (StepExecution stepExecution : stepExecutions) {
saveExecutionContext(stepExecution);
saveExecutionContext(stepExecution.getJobExecution());
}
}
}
这是否与版本4中的'ExecutionContext'的反序列化机制更改有关?非常感谢任何帮助!!
2条答案
按热度按时间bkkx9g8r1#
Spring Batch 4不支持MongoDB作为作业存储库,因此您报告的问题应该与您的自定义实现相关。
序列化机制已从v3更改为v4,请参阅:https://docs.spring.io/spring-batch/docs/4.0.x/reference/html/whatsnew.html#whatsNewSerialization。因此,这可能是您的问题的原因。
nlejzf6q2#
这与我错过添加更新选项(将upsert设置为true)的自定义实现中的SPRING BATCH无关
而不是仅仅
改为下文