Nacos源码分析二十一、数据一致性同步-AP

x33g5p2x  于2021-12-20 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(672)

先讨论DistroConsistencyServiceImpl AP最终一致性的实现。

我们知道当添加实例时会调用到put方法:

@Override
public void put(String key, Record value) throws NacosException {
    onPut(key, value);
    // 一致性协议的同步数据。这里同步数据是异步任务执行的,也就是说先返回客户端put成功再同步,弱一致性。 AP模型
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), ApplyAction.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}

onPut是本机节点的Notifier通知change事件。 distroProtocol.sync是最终一致性的数据同步。

onPut

public void onPut(String key, Record value) {
    
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        //创建临时数据
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        //放进一个map里
        dataStore.put(key, datum);
    }

    //没有监听器就返回
    if (!listeners.containsKey(key)) {
        return;
    }

    //有监听立即通知服务有改变
    notifier.addTask(key, ApplyAction.CHANGE);
}

主要就是最后一行notifier.addTask(key, ApplyAction.CHANGE); 就是往tasks队列中添加数据:

public void addTask(String datumKey, ApplyAction action) {

    // 如果services包含key,并且类型是change,则直接返回
    if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
        return;
    }
    if (action == ApplyAction.CHANGE) {
        // 如果是change,services添加一个空数据
        services.put(datumKey, StringUtils.EMPTY);
    }
    // 数据对 加入队列中
    tasks.offer(Pair.with(datumKey, action));
}

我们之前分析到,notifier中有一个无限循环的任务,会从tasks中一直take数据,当取到新的任务时,则会发起通知:

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");

    // 无限循环
    for (; ; ) {
        try {
            // 消费队列
            Pair<String, ApplyAction> pair = tasks.take();
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

private void handle(Pair<String, ApplyAction> pair) {
    try {
        String datumKey = pair.getValue0();
        ApplyAction action = pair.getValue1();

        // 消费后移除key
        services.remove(datumKey);

        int count = 0;

        // 没有监听直接退出
        if (!listeners.containsKey(datumKey)) {
            return;
        }

        // 遍历监听器,调动对应的监听方法
        for (RecordListener listener : listeners.get(datumKey)) {

            count++;

            try {
                if (action == ApplyAction.CHANGE) {
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }

                if (action == ApplyAction.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }

        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                       datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

listener的onChange或者onDelete。 这里的listener是Service类:

@Override
public void onChange(String key, Instances value) throws Exception {
    
    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    
    for (Instance instance : value.getInstanceList()) {
        
        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }
        
        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }
        
        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    
    recalculateChecksum();
}

然后是updateIPs方法:

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }
            
            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
            
            if (!clusterMap.containsKey(instance.getClusterName())) {
                Loggers.SRV_LOG
                        .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }
            
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
            
            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }
    
    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }
    
    setLastModifiedMillis(System.currentTimeMillis());
    // PushService的serviceChange,发布ServiceChangeEvent事件
    getPushService().serviceChanged(this);
    ...
    
}

更新信息,然后发布ServiceChangeEvent事件:

PushService的onApplicationEvent方法监听该事件,内容之前分析过了,就是UDP消息发给客户端。

distroProtocol.sync

AP一致性同步

public void sync(DistroKey distroKey, ApplyAction action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        // holder持有临时同步延迟执行器引擎,引擎中有NacosTaskProcessor,临时一致性情况下实际上持有的是DistroDelayTaskProcessor,添加任务后最终由processor执行
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
        }
    }
}

我们看一下这个DistroDelayTaskProcessor执行器的process方法:

@Override
public boolean process(AbstractDelayTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    // 发起临时一致性同步任务
    if (ApplyAction.CHANGE.equals(distroDelayTask.getAction())) {
        DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
        distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask);
        return true;
    }
    return false;
}

提交了一个DistroSyncChangeTask任务

@Override
public void run() {
    Loggers.DISTRO.info("[DISTRO-START] {}", toString());
    try {
        String type = getDistroKey().getResourceType();
        DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
        distroData.setType(ApplyAction.CHANGE);
        // syncData执行数据同步,交由 NamingProxy.syncData执行 /nacos/v1/ns/distro/datum
        boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
        // 同步失败重试,就是重新提交任务
        if (!result) {
            handleFailedTask();
        }
        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
    } catch (Exception e) {
        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
        handleFailedTask();
    }
}

提交消息给集群中其他节点。

我们看一下接收方DistroController的onSyncDatum方法:

@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
    
    if (dataMap.isEmpty()) {
        Loggers.DISTRO.error("[onSync] receive empty entity!");
        throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
    }
    
    for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
        if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
            String namespaceId = KeyBuilder.getNamespace(entry.getKey());
            String serviceName = KeyBuilder.getServiceName(entry.getKey());
            if (!serviceManager.containService(namespaceId, serviceName) && switchDomain
                    .isDefaultInstanceEphemeral()) {
                serviceManager.createEmptyService(namespaceId, serviceName, true);
            }
            DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), null,
                    entry.getValue());
            // 有临时一致性协议处理器处理请求数据,最终是调用到DistroConsistencyServiceImpl的onPut方法放入到缓存中,然后udp通知客户端更新
            distroProtocol.onReceive(distroHttpData);
        }
    }
    return ResponseEntity.ok("ok");
}

然后是distroProtocol.onReceive:

public void onReceive(DistroData distroData) {
    String resourceType = distroData.getDistroKey().getResourceType();
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
        return;
    }
    dataProcessor.processData(distroData);
}

此时这个处理器是DistroConsistencyServiceImpl实现的,即最终一致性服务实现类,我们看一下processData:

@Override
public void processData(DistroData distroData) {
    DistroHttpData distroHttpData = (DistroHttpData) distroData;
    Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
    onPut(datum.key, datum.value);
}

可以看到,执行的是onPut。走前面分析的onPut逻辑。

相关文章