java 升级到版本4后,Spring批处理未写入ExecutionContext mongo元数据集合

ecfsfe2w  于 2023-04-04  发布在  Java
关注(0)|答案(2)|浏览(98)

我使用Spring批处理,元数据存储在mongo DB中,存储在以下集合中

JobInstance
JobExecution
StepExecution
ExecutionContext
Sequences

当我将Spring batch从版本3升级到4时,batch framework没有将元数据记录写入ExecutionContext collection。Job运行良好,并且它将元数据存储到剩余的collection中,没有任何问题。
下面是我们创建的MongoExecutionContextDao

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
import org.bson.Document;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.repository.dao.ExecutionContextDao;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Repository;
import org.springframework.util.Assert;
import org.springframework.util.NumberUtils;

import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collection;
import java.util.Map;

@Repository
public class MongoExecutionContextDao extends AbstractMongoDao implements ExecutionContextDao {

    @PostConstruct
    public void init() {
        super.init();

        getCollection().createIndex(
                new Document().append(STEP_EXECUTION_ID_KEY, 1)
                        .append(JOB_EXECUTION_ID_KEY, 1)
        );
    }

    @Override
    public ExecutionContext getExecutionContext(JobExecution jobExecution) {
        return getExecutionContext(JOB_EXECUTION_ID_KEY, jobExecution.getId());
    }

    @Override
    public ExecutionContext getExecutionContext(StepExecution stepExecution) {
        return getExecutionContext(STEP_EXECUTION_ID_KEY, stepExecution.getId());
    }

    @Override
    public void saveExecutionContext(JobExecution jobExecution) {
        saveOrUpdateExecutionContext(JOB_EXECUTION_ID_KEY, jobExecution.getId(), jobExecution.getExecutionContext());
    }

    @Override
    public void saveExecutionContext(StepExecution stepExecution) {
        saveOrUpdateExecutionContext(STEP_EXECUTION_ID_KEY, stepExecution.getId(), stepExecution.getExecutionContext());
    }

    @Override
    public void updateExecutionContext(JobExecution jobExecution) {
        saveOrUpdateExecutionContext(JOB_EXECUTION_ID_KEY, jobExecution.getId(), jobExecution.getExecutionContext());
    }

    @Override
    public void updateExecutionContext(StepExecution stepExecution) {
        saveOrUpdateExecutionContext(STEP_EXECUTION_ID_KEY, stepExecution.getId(), stepExecution.getExecutionContext());
    }

    private void saveOrUpdateExecutionContext(String executionIdKey, Long executionId, ExecutionContext executionContext) {
        Assert.notNull(executionId, "ExecutionId must not be null.");
        Assert.notNull(executionContext, "The ExecutionContext must not be null.");

        Document dbObject = new Document(executionIdKey, executionId);
        for (Map.Entry<String, Object> entry : executionContext.entrySet()) {
            Object value = entry.getValue();
            String key = entry.getKey();
            dbObject.put(key.replaceAll(DOT_STRING, DOT_ESCAPE_STRING), value);
            if (value instanceof BigDecimal || value instanceof BigInteger) {
                dbObject.put(key + TYPE_SUFFIX, value.getClass().getName());
            }
        }
        Document updateDoc = new Document("$set",dbObject);

        getCollection().updateOne(dbObject,updateDoc);
            }

    @SuppressWarnings({"unchecked"})
    private ExecutionContext getExecutionContext(String executionIdKey, Long executionId) {
        Assert.notNull(executionId, "ExecutionId must not be null.");
        Document result = (Document) getCollection().find(new Document(executionIdKey, executionId)).first();
        ExecutionContext executionContext = new ExecutionContext();
        if (result != null) {
            result.remove(executionIdKey);
            removeSystemFields(result);
            for (String key : result.keySet()) {
                Object value = result.get(key);
                String type = (String) result.get(key + TYPE_SUFFIX);
                if (type != null && Number.class.isAssignableFrom(value.getClass())) {
                    try {
                        value = NumberUtils.convertNumberToTargetClass((Number) value, (Class<? extends Number>) Class.forName(type));
                    } catch (Exception e) {
                        logger.warn("Failed to convert {} to {}", key, type);
                    }
                }
                //Mongo db does not allow key name with "." character.
                executionContext.put(
                        key.replaceAll(DOT_ESCAPE_STRING, DOT_STRING), value);
            }
        }
        return executionContext;
    }

    @Override
    protected MongoCollection getCollection() {
        return mongoTemplate.getCollection(ExecutionContext.class.getSimpleName());
    }

    @Override
    public void saveExecutionContexts(Collection<StepExecution> stepExecutions) {
        Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions");
        for (StepExecution stepExecution : stepExecutions) {
            saveExecutionContext(stepExecution);
            saveExecutionContext(stepExecution.getJobExecution());
        }
    }
}

这是否与版本4中的'ExecutionContext'的反序列化机制更改有关?非常感谢任何帮助!!

bkkx9g8r

bkkx9g8r1#

Spring Batch 4不支持MongoDB作为作业存储库,因此您报告的问题应该与您的自定义实现相关。
序列化机制已从v3更改为v4,请参阅:https://docs.spring.io/spring-batch/docs/4.0.x/reference/html/whatsnew.html#whatsNewSerialization。因此,这可能是您的问题的原因。

nlejzf6q

nlejzf6q2#

这与我错过添加更新选项(将upsert设置为true)的自定义实现中的SPRING BATCH无关
而不是仅仅

getCollection().updateOne(dbObject,updateDoc);

改为下文

UpdateOptions updateOptions = new UpdateOptions();
updateOptions.upsert(true);
getCollection().updateOne(dbObject,updateDoc,updateOptions);

相关问题