motan源码分析十:流量切换

x33g5p2x  于2021-12-21 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(309)

motan提供了流量切换的功能,可以实现把一个group的流量切换到另一个group(一个或多个服务都可以)。大家可以使用tomcat部署motan的管理工具,并设置几个组,例如可以参考demo代码:motan_demo_server_commandRegistry.xml。分析源码时可以发现,流量切换是在客户端完成的,与服务端没什么关系,在实际的工作中,可以解决很多问题,例如:某个集群出了问题,可以马上将流量切换到其它集群;在系统升级的过程中,将带升级集群的流量切换到其它集群,实现了24小时随时升级等。

1.motan的流量切换是通过command来实现的,每次我们在motan管理器上进行设置的时候,其实是写入信息到注册中心的command节点,而motan又监听了这些command节点,下面是motan的客户端监听command相关的代码

protected voidsubscribeCommand(final URL url, final CommandListener commandListener) {
        try{
            clientLock.lock();//对clientLock进行上锁
            ConcurrentHashMap<CommandListener, IZkDataListener> dataChangeListeners = commandListeners.get(url);//数据变更监听器
            if (dataChangeListeners == null) {
                commandListeners.putIfAbsent(url, new ConcurrentHashMap<CommandListener, IZkDataListener>());
                dataChangeListeners = commandListeners.get(url);
            }
            IZkDataListener zkDataListener = dataChangeListeners.get(commandListener);
            if (zkDataListener == null) {
                dataChangeListeners.putIfAbsent(commandListener, newIZkDataListener() {//增加新的listener
                    @Override
                    public voidhandleDataChange(String dataPath, Object data) throws Exception {
                        commandListener.notifyCommand(url, (String) data);//调用commandListener的notifyCommand方法
                        LoggerUtil.info(String.format("[ZookeeperRegistry] command data change: path=%s, command=%s", dataPath, (String) data));
                    }

                    @Override
                    public voidhandleDataDeleted(String dataPath) throws Exception {
                        commandListener.notifyCommand(url, null);
                        LoggerUtil.info(String.format("[ZookeeperRegistry] command deleted: path=%s", dataPath));
                    }
                });
                zkDataListener = dataChangeListeners.get(commandListener);
            }

            String commandPath =ZkUtils.toCommandPath(url);
            zkClient.subscribeDataChanges(commandPath, zkDataListener);//向zookeeper注册监听事件
            LoggerUtil.info(String.format("[ZookeeperRegistry] subscribe command: path=%s, info=%s", commandPath, url.toFullStr()));
        } catch(Throwable e) {
            throw new MotanFrameworkException(String.format("Failed to subscribe %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
        } finally{
            clientLock.unlock();
        }
    }

2.CommandServiceManager实现了上节中的commandListener

public  void  notifyCommand(URL serviceUrl, String commandString) {
     LoggerUtil.info( "CommandServiceManager notify command. service:"  + serviceUrl.toSimpleString() +  ", command:"  + commandString);
 
     if  (!MotanSwitcherUtil.isOpen(MOTAN_COMMAND_SWITCHER) || commandString ==  null ) { //判断命令开关是否打开
         LoggerUtil.info( "command reset empty since swither is close." );
         commandString =  "" ;
     }
 
     List<URL> finalResult =  new  ArrayList<URL>();
     URL urlCopy = serviceUrl.createCopy(); //serviceurl的副本
 
     if  (!StringUtils.equals(commandString, commandStringCache)) {
         commandStringCache = commandString;
         commandCache = RpcCommandUtil.stringToCommand(commandStringCache); //将字符串转换为命令
         Map<String, Integer> weights =  new  HashMap<String, Integer>();
 
         if  (commandCache !=  null ) {
             commandCache.sort();
             finalResult = discoverServiceWithCommand(refUrl, weights, commandCache);
         }  else  {
             // 如果是指令有异常时,应当按没有指令处理,防止错误指令导致服务异常
             if  (StringUtils.isNotBlank(commandString)) {
                 LoggerUtil.warn( "command parse fail, ignored! command:"  + commandString);
                 commandString =  "" ;
             }
             // 没有命令时,只返回这个manager实际group对应的结果
             finalResult.addAll(discoverOneGroup(refUrl));
 
         }
 
         // 指令变化时,删除不再有效的缓存,取消订阅不再有效的group
         Set<String> groupKeys = groupServiceCache.keySet();
         for  (String gk : groupKeys) {
             if  (!weights.containsKey(gk)) {
                 groupServiceCache.remove(gk);
                 URL urlTemp = urlCopy.createCopy();
                 urlTemp.addParameter(URLParamType.group.getName(), gk);
                 registry.unsubscribeService(urlTemp,  this );
             }
         }
     }  else  {
         LoggerUtil.info( "command not change. url:"  + serviceUrl.toSimpleString());
         // 指令没有变化,什么也不做
         return ;
     }
 
     for  (NotifyListener notifyListener : notifySet) {
         notifyListener.notify(registry.getUrl(), finalResult);
     }
 
     // 当指令从有改到无时,会触发取消订阅所有的group,需要重新订阅本组的service
     if  ( "" .equals(commandString)) {
         LoggerUtil.info( "reSub service"  + refUrl.toSimpleString());
         registry.subscribeService(refUrl,  this );
     }
}

3.discoverServiceWithCommand的相关代码

public  List<URL> discoverServiceWithCommand(URL serviceUrl, Map<String, Integer> weights, RpcCommand rpcCommand, String localIP) {
     if  (rpcCommand ==  null  || CollectionUtil.isEmpty(rpcCommand.getClientCommandList())) {
         return  discoverOneGroup(serviceUrl);
     }
 
     List<URL> mergedResult =  new  LinkedList<URL>();
     String path = serviceUrl.getPath(); //获取路径
 
     List<RpcCommand.ClientCommand> clientCommandList = rpcCommand.getClientCommandList();
     boolean  hit =  false ;
     for  (RpcCommand.ClientCommand command : clientCommandList) {
         mergedResult =  new  LinkedList<URL>();
         // 判断当前url是否符合过滤条件
         boolean  match = RpcCommandUtil.match(command.getPattern(), path);
         if  (match) {
             hit =  true ;
             if  (!CollectionUtil.isEmpty(command.getMergeGroups())) {
                 // 计算出所有要合并的分组及权重
                 try  {
                     buildWeightsMap(weights, command);
                 }  catch  (MotanFrameworkException e) {
                     LoggerUtil.warn( "build weights map fail!"  + e.getMessage());
                     continue ;
                 }
                 // 根据计算结果,分别发现各个group的service,合并结果
                 mergedResult.addAll(mergeResult(serviceUrl, weights));
             }  else  {
                 mergedResult.addAll(discoverOneGroup(serviceUrl));
             }
 
             LoggerUtil.info( "mergedResult: size-"  + mergedResult.size() +  " --- "  + mergedResult.toString());
 
             if  (!CollectionUtil.isEmpty(command.getRouteRules())) {
                 LoggerUtil.info( "router: "  + command.getRouteRules().toString());
 
                 for  (String routeRule : command.getRouteRules()) {
                     String[] fromTo = routeRule.replaceAll( "\\s+" ,  "" ).split( "to" );
 
                     if  (fromTo.length !=  2 ) {
                         routeRuleConfigError();
                         continue ;
                     }
                     String from = fromTo[ 0 ];
                     String to = fromTo[ 1 ];
                     if  (from.length() <  1  || to.length() <  1  || !IP_PATTERN.matcher(from).find() || !IP_PATTERN.matcher(to).find()) {
                         routeRuleConfigError();
                         continue ;
                     }
                     boolean  oppositeFrom = from.startsWith( "!" );
                     boolean  oppositeTo = to.startsWith( "!" );
                     if  (oppositeFrom) {
                         from = from.substring( 1 );
                     }
                     if  (oppositeTo) {
                         to = to.substring( 1 );
                     }
                     int  idx = from.indexOf( '*' );
                     boolean  matchFrom;
                     if  (idx != - 1 ) {
                         matchFrom = localIP.startsWith(from.substring( 0 , idx));
                     }  else  {
                         matchFrom = localIP.equals(from);
                     }
 
                     // 开头有!,取反
                     if  (oppositeFrom) {
                         matchFrom = !matchFrom;
                     }
                     LoggerUtil.info( "matchFrom: "  + matchFrom +  ", localip:"  + localIP +  ", from:"  + from);
                     if  (matchFrom) {
                         boolean  matchTo;
                         Iterator<URL> iterator = mergedResult.iterator();
                         while  (iterator.hasNext()) {
                             URL url = iterator.next();
                             if  (url.getProtocol().equalsIgnoreCase( "rule" )) {
                                 continue ;
                             }
                             idx = to.indexOf( '*' );
                             if  (idx != - 1 ) {
                                 matchTo = url.getHost().startsWith(to.substring( 0 , idx));
                             }  else  {
                                 matchTo = url.getHost().equals(to);
                             }
                             if  (oppositeTo) {
                                 matchTo = !matchTo;
                             }
                             if  (!matchTo) {
                                 iterator.remove();
                                 LoggerUtil.info( "router To not match. url remove : "  + url.toSimpleString());
                             }
                         }
                     }
                 }
             }
             // 只取第一个匹配的 TODO 考虑是否能满足绝大多数场景需求
             break ;
         }
     }
 
     List<URL> finalResult =  new  ArrayList<URL>();
     if  (!hit) {
         finalResult = discoverOneGroup(serviceUrl);
     }  else  {
         finalResult.addAll(mergedResult);
     }
     return  finalResult;
}
上一篇:九:开关

相关文章