motan提供了流量切换的功能,可以实现把一个group的流量切换到另一个group(一个或多个服务都可以)。大家可以使用tomcat部署motan的管理工具,并设置几个组,例如可以参考demo代码:motan_demo_server_commandRegistry.xml。分析源码时可以发现,流量切换是在客户端完成的,与服务端没什么关系,在实际的工作中,可以解决很多问题,例如:某个集群出了问题,可以马上将流量切换到其它集群;在系统升级的过程中,将带升级集群的流量切换到其它集群,实现了24小时随时升级等。
1.motan的流量切换是通过command来实现的,每次我们在motan管理器上进行设置的时候,其实是写入信息到注册中心的command节点,而motan又监听了这些command节点,下面是motan的客户端监听command相关的代码
protected voidsubscribeCommand(final URL url, final CommandListener commandListener) {
try{
clientLock.lock();//对clientLock进行上锁
ConcurrentHashMap<CommandListener, IZkDataListener> dataChangeListeners = commandListeners.get(url);//数据变更监听器
if (dataChangeListeners == null) {
commandListeners.putIfAbsent(url, new ConcurrentHashMap<CommandListener, IZkDataListener>());
dataChangeListeners = commandListeners.get(url);
}
IZkDataListener zkDataListener = dataChangeListeners.get(commandListener);
if (zkDataListener == null) {
dataChangeListeners.putIfAbsent(commandListener, newIZkDataListener() {//增加新的listener
@Override
public voidhandleDataChange(String dataPath, Object data) throws Exception {
commandListener.notifyCommand(url, (String) data);//调用commandListener的notifyCommand方法
LoggerUtil.info(String.format("[ZookeeperRegistry] command data change: path=%s, command=%s", dataPath, (String) data));
}
@Override
public voidhandleDataDeleted(String dataPath) throws Exception {
commandListener.notifyCommand(url, null);
LoggerUtil.info(String.format("[ZookeeperRegistry] command deleted: path=%s", dataPath));
}
});
zkDataListener = dataChangeListeners.get(commandListener);
}
String commandPath =ZkUtils.toCommandPath(url);
zkClient.subscribeDataChanges(commandPath, zkDataListener);//向zookeeper注册监听事件
LoggerUtil.info(String.format("[ZookeeperRegistry] subscribe command: path=%s, info=%s", commandPath, url.toFullStr()));
} catch(Throwable e) {
throw new MotanFrameworkException(String.format("Failed to subscribe %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
} finally{
clientLock.unlock();
}
}
2.CommandServiceManager实现了上节中的commandListener
public void notifyCommand(URL serviceUrl, String commandString) {
LoggerUtil.info( "CommandServiceManager notify command. service:" + serviceUrl.toSimpleString() + ", command:" + commandString);
if (!MotanSwitcherUtil.isOpen(MOTAN_COMMAND_SWITCHER) || commandString == null ) { //判断命令开关是否打开
LoggerUtil.info( "command reset empty since swither is close." );
commandString = "" ;
}
List<URL> finalResult = new ArrayList<URL>();
URL urlCopy = serviceUrl.createCopy(); //serviceurl的副本
if (!StringUtils.equals(commandString, commandStringCache)) {
commandStringCache = commandString;
commandCache = RpcCommandUtil.stringToCommand(commandStringCache); //将字符串转换为命令
Map<String, Integer> weights = new HashMap<String, Integer>();
if (commandCache != null ) {
commandCache.sort();
finalResult = discoverServiceWithCommand(refUrl, weights, commandCache);
} else {
// 如果是指令有异常时,应当按没有指令处理,防止错误指令导致服务异常
if (StringUtils.isNotBlank(commandString)) {
LoggerUtil.warn( "command parse fail, ignored! command:" + commandString);
commandString = "" ;
}
// 没有命令时,只返回这个manager实际group对应的结果
finalResult.addAll(discoverOneGroup(refUrl));
}
// 指令变化时,删除不再有效的缓存,取消订阅不再有效的group
Set<String> groupKeys = groupServiceCache.keySet();
for (String gk : groupKeys) {
if (!weights.containsKey(gk)) {
groupServiceCache.remove(gk);
URL urlTemp = urlCopy.createCopy();
urlTemp.addParameter(URLParamType.group.getName(), gk);
registry.unsubscribeService(urlTemp, this );
}
}
} else {
LoggerUtil.info( "command not change. url:" + serviceUrl.toSimpleString());
// 指令没有变化,什么也不做
return ;
}
for (NotifyListener notifyListener : notifySet) {
notifyListener.notify(registry.getUrl(), finalResult);
}
// 当指令从有改到无时,会触发取消订阅所有的group,需要重新订阅本组的service
if ( "" .equals(commandString)) {
LoggerUtil.info( "reSub service" + refUrl.toSimpleString());
registry.subscribeService(refUrl, this );
}
}
3.discoverServiceWithCommand的相关代码
public List<URL> discoverServiceWithCommand(URL serviceUrl, Map<String, Integer> weights, RpcCommand rpcCommand, String localIP) {
if (rpcCommand == null || CollectionUtil.isEmpty(rpcCommand.getClientCommandList())) {
return discoverOneGroup(serviceUrl);
}
List<URL> mergedResult = new LinkedList<URL>();
String path = serviceUrl.getPath(); //获取路径
List<RpcCommand.ClientCommand> clientCommandList = rpcCommand.getClientCommandList();
boolean hit = false ;
for (RpcCommand.ClientCommand command : clientCommandList) {
mergedResult = new LinkedList<URL>();
// 判断当前url是否符合过滤条件
boolean match = RpcCommandUtil.match(command.getPattern(), path);
if (match) {
hit = true ;
if (!CollectionUtil.isEmpty(command.getMergeGroups())) {
// 计算出所有要合并的分组及权重
try {
buildWeightsMap(weights, command);
} catch (MotanFrameworkException e) {
LoggerUtil.warn( "build weights map fail!" + e.getMessage());
continue ;
}
// 根据计算结果,分别发现各个group的service,合并结果
mergedResult.addAll(mergeResult(serviceUrl, weights));
} else {
mergedResult.addAll(discoverOneGroup(serviceUrl));
}
LoggerUtil.info( "mergedResult: size-" + mergedResult.size() + " --- " + mergedResult.toString());
if (!CollectionUtil.isEmpty(command.getRouteRules())) {
LoggerUtil.info( "router: " + command.getRouteRules().toString());
for (String routeRule : command.getRouteRules()) {
String[] fromTo = routeRule.replaceAll( "\\s+" , "" ).split( "to" );
if (fromTo.length != 2 ) {
routeRuleConfigError();
continue ;
}
String from = fromTo[ 0 ];
String to = fromTo[ 1 ];
if (from.length() < 1 || to.length() < 1 || !IP_PATTERN.matcher(from).find() || !IP_PATTERN.matcher(to).find()) {
routeRuleConfigError();
continue ;
}
boolean oppositeFrom = from.startsWith( "!" );
boolean oppositeTo = to.startsWith( "!" );
if (oppositeFrom) {
from = from.substring( 1 );
}
if (oppositeTo) {
to = to.substring( 1 );
}
int idx = from.indexOf( '*' );
boolean matchFrom;
if (idx != - 1 ) {
matchFrom = localIP.startsWith(from.substring( 0 , idx));
} else {
matchFrom = localIP.equals(from);
}
// 开头有!,取反
if (oppositeFrom) {
matchFrom = !matchFrom;
}
LoggerUtil.info( "matchFrom: " + matchFrom + ", localip:" + localIP + ", from:" + from);
if (matchFrom) {
boolean matchTo;
Iterator<URL> iterator = mergedResult.iterator();
while (iterator.hasNext()) {
URL url = iterator.next();
if (url.getProtocol().equalsIgnoreCase( "rule" )) {
continue ;
}
idx = to.indexOf( '*' );
if (idx != - 1 ) {
matchTo = url.getHost().startsWith(to.substring( 0 , idx));
} else {
matchTo = url.getHost().equals(to);
}
if (oppositeTo) {
matchTo = !matchTo;
}
if (!matchTo) {
iterator.remove();
LoggerUtil.info( "router To not match. url remove : " + url.toSimpleString());
}
}
}
}
}
// 只取第一个匹配的 TODO 考虑是否能满足绝大多数场景需求
break ;
}
}
List<URL> finalResult = new ArrayList<URL>();
if (!hit) {
finalResult = discoverOneGroup(serviceUrl);
} else {
finalResult.addAll(mergedResult);
}
return finalResult;
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/a1439226817/article/details/68483502
内容来源于网络,如有侵权,请联系作者删除!