Nacos源码分析二十二、数据一致性同步-CP

x33g5p2x  于2021-12-20 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(591)

然后是RaftConsistencyServiceImpl CP强制一致性实现。

同样是put方法:

@Override
public void put(String key, Record value) throws NacosException {
    try {
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft put failed.", e);
        throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                e);
    }
}

然后是raftCore.signalPublish:

public void signalPublish(String key, Record value) throws Exception {

    //不是leader
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
        
        final RaftPeer leader = getLeader();

        //交给leader去做/v1/ns/raft/datum
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }
    
    try {
        // 是leader
        // 加锁
        OPERATE_LOCK.lock();
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }
        
        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));

        //发布数据改变通知  peers是所有节点集合. peers.local获取本机
        onPublish(datum, peers.local());
        
        final String content = json.toString();

        // 过半同步成功才会响应,也就是说put操作需要过半同步成功,强一致性 CP模型
        //只要过半的结点数
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        //遍历所有结点
        for (final String server : peers.allServersIncludeMyself()) {
            //自己算一次
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            // /v1/ns/raft/datum/commit
            final String url = buildUrl(server, API_ON_PUB);
            // 异步同步数据
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
                    new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT
                                        .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                                datum.key, server, response.getStatusCode());
                                return 1;
                            }
                            // 处理完成coundDown
                            latch.countDown();
                            return 0;
                        }
                        
                        @Override
                        public STATE onContentWriteCompleted() {
                            return STATE.CONTINUE;
                        }
                    });
            
        }

        //等待半数完成  还有个5秒超时时间
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            // only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }
        
        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        // 解锁
        OPERATE_LOCK.unlock();
    }
}

此时有两个分支,当前节点是leader的情况,和当前节点不是leader的情况。

当前节点不是leader

此时通过raftProxy.proxyPostLarge将消息转发给leader,RaftController的publish方法接收:

@PostMapping("/datum")
public String publish(HttpServletRequest request, HttpServletResponse response) throws Exception {
    
    response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
    response.setHeader("Cache-Control", "no-cache");
    response.setHeader("Content-Encode", "gzip");
    
    String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
    String value = URLDecoder.decode(entity, "UTF-8");
    JsonNode json = JacksonUtils.toObj(value);
    
    String key = json.get("key").asText();
    if (KeyBuilder.matchInstanceListKey(key)) {
        raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Instances.class));
        return "ok";
    }
    
    if (KeyBuilder.matchSwitchKey(key)) {
        raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), SwitchDomain.class));
        return "ok";
    }
    
    if (KeyBuilder.matchServiceMetaKey(key)) {
        raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Service.class));
        return "ok";
    }
    
    throw new NacosException(NacosException.INVALID_PARAM, "unknown type publish key: " + key);
}

可以看到就是调用raftConsistencyService.put方法,回到最初开始的地方了。

当前节点是leader时

首先onPublish:
public void onPublish(Datum datum, RaftPeer source) throws Exception {
    RaftPeer local = peers.local();
    if (datum.value == null) {
        Loggers.RAFT.warn("received empty datum");
        throw new IllegalStateException("received empty datum");
    }

    //不是leader不能干这个事
    if (!peers.isLeader(source.ip)) {
        Loggers.RAFT
                .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                        JacksonUtils.toJson(getLeader()));
        throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
    }

    //过时了
    if (source.term.get() < local.term.get()) {
        Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                JacksonUtils.toJson(local));
        throw new IllegalStateException(
                "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
    }

    //重置任期
    local.resetLeaderDue();
    
    // if data should be persisted, usually this is true:
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);
    }

    //放入数据
    datums.put(datum.key, datum);
    
    if (isLeader()) {
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    } else {
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    raftStore.updateTerm(local.term.get());

    //添加任务
    notifier.addTask(datum.key, ApplyAction.CHANGE);
    
    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

这里做了几件事

  1. 更新leader任期
  2. 如果数据需要永久存储,则raftStore.write(datum)写数据到磁盘。
  3. 本地datums缓存数据
  4. 更新任期标识term
  5. notifier添加change任务通知
一致性同步

集群下的所有节点都同步一下,调用/v1/ns/raft/datum/commit接口,CountDownLatch的数量是集群数量/2+1,这样保证半数通过。

final String content = json.toString();

// 过半同步成功才会响应,也就是说put操作需要过半同步成功,强一致性 CP模型
//只要过半的结点数
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//遍历所有结点
for (final String server : peers.allServersIncludeMyself()) {
    //自己算一次
    if (isLeader(server)) {
        latch.countDown();
        continue;
    }
    // /v1/ns/raft/datum/commit
    final String url = buildUrl(server, API_ON_PUB);
    //同步数据
    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
            new AsyncCompletionHandler<Integer>() {
                @Override
                public Integer onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        Loggers.RAFT
                                .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                        datum.key, server, response.getStatusCode());
                        return 1;
                    }
                    // 处理完成coundDown
                    latch.countDown();
                    return 0;
                }
                
                @Override
                public STATE onContentWriteCompleted() {
                    return STATE.CONTINUE;
                }
            });
    
}

//等待半数完成  还有个5秒超时时间
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
    // only majority servers return success can we consider this update success
    Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
    throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}

我们看一下同步数据的接收方,RaftController的onPublish方法:

@PostMapping("/datum/commit")
public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception {
    
    response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
    response.setHeader("Cache-Control", "no-cache");
    response.setHeader("Content-Encode", "gzip");
    
    String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
    String value = URLDecoder.decode(entity, "UTF-8");
    
    JsonNode jsonObject = JacksonUtils.toObj(value);
    String key = "key";
    
    RaftPeer source = JacksonUtils.toObj(jsonObject.get("source").toString(), RaftPeer.class);
    JsonNode datumJson = jsonObject.get("datum");
    
    Datum datum = null;
    if (KeyBuilder.matchInstanceListKey(datumJson.get(key).asText())) {
        datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Instances>>() {
        });
    } else if (KeyBuilder.matchSwitchKey(datumJson.get(key).asText())) {
        datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<SwitchDomain>>() {
        });
    } else if (KeyBuilder.matchServiceMetaKey(datumJson.get(key).asText())) {
        datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Service>>() {
        });
    }

    // 还是调用RaftConsistencyServiceImpl的onPublish
    raftConsistencyService.onPut(datum, source);
    return "ok";
}

调用raftConsistencyService.onPut方法

public void onPut(Datum datum, RaftPeer source) throws NacosException {
    try {
        raftCore.onPublish(datum, source);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft onPut failed.", e);
        throw new NacosException(NacosException.SERVER_ERROR,
                "Raft onPut failed, datum:" + datum + ", source: " + source, e);
    }
}

然后是raftCore.onPublish方法,回到上面了。

总结

AP和CP模式下的数据一致性同步我们分析完了。对照着流程图看还是很容易搞明白的。下面我们开始讨论nacos的raft选举。

相关文章