motan源码分析八:涉及到底层的客户端调用

x33g5p2x  于2021-12-21 转载在 其他  
字(4.0k)|赞(0)|评价(0)|浏览(323)

之前我们分析了客户端调用服务端的源码,但是没有涉及到通讯层和序列化层,本文将之前讲过的内容做一次串联。

1.上层通过动态代理调用refer的call,每个refer又对应一个nettyclient,下面来看一下nettyclient的调用服务端操作

private  Response request(Request request,  boolean  async)  throws  TransportException {
     Channel channel =  null ;
 
     Response response =  null ;
 
     try  {
         // return channel or throw exception(timeout or connection_fail)
         channel = borrowObject(); //向连接池拿连接
 
         if  (channel ==  null ) {
             LoggerUtil.error( "NettyClient borrowObject null: url="  + url.getUri() +  " "
                     + MotanFrameworkUtil.toString(request));
             return  null ;
         }
 
         // async request
         response = channel.request(request); //调用channel的request
         // return channel to pool
         returnObject(channel); //归还连接
     }  catch  (Exception e) {
         LoggerUtil.error(
                 "NettyClient request Error: url="  + url.getUri() +  " "  + MotanFrameworkUtil.toString(request), e);
         //TODO 对特定的异常回收channel
         invalidateObject(channel); //销毁坏的连接
 
         if  (e  instanceof  MotanAbstractException) {
             throw  (MotanAbstractException) e;
         }  else  {
             throw  new  MotanServiceException( "NettyClient request Error: url="  + url.getUri() +  " "
                     + MotanFrameworkUtil.toString(request), e);
         }
     }
 
     // aysnc or sync result
     response = asyncResponse(response, async); //处理response
 
     return  response;
}

2.nettychannel的request操作

public  Response request(Request request)  throws  TransportException {
     int  timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(),
             URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
     if  (timeout <=  0 ) {
            throw  new  MotanFrameworkException( "NettyClient init Error: timeout("  + timeout +  ") <= 0 is forbid." ,
                    MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
        }
     NettyResponseFuture response =  new  NettyResponseFuture(request, timeout,  this .nettyClient); //创建异步response对象
     this .nettyClient.registerCallback(request.getRequestId(), response); //将此response存入到map,处理完后,会移出
     ChannelFuture writeFuture =  this .channel.write(request); //向服务端传递request对象,写之前会进行序列化的操作
 
     boolean  result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS); //标识是否成功
 
     if  (result && writeFuture.isSuccess()) {
         response.addListener( new  FutureListener() { //增加response的监听器
             @Override
             public  void  operationComplete(Future future)  throws  Exception {
                 if  (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
                     // 成功的调用
                     nettyClient.resetErrorCount(); //成功
                 }  else  {
                     // 失败的调用
                     nettyClient.incrErrorCount(); //对失败次数+1,如果同一个client连续失败达到所有的连接次数时,标识此client不可用,由心跳管理器负责恢复此client的可用状态
                 }
             }
         });
         return  response; //返回此response,此response为异步的response,由业务线程接手后续接收的过程
     }
 
     writeFuture.cancel();
     response =  this .nettyClient.removeCallback(request.getRequestId()); //在map中移出此response
 
     if  (response !=  null ) {
         response.cancel();
     }
 
     // 失败的调用
     nettyClient.incrErrorCount();
 
     if  (writeFuture.getCause() !=  null ) {
         throw  new  MotanServiceException( "NettyChannel send request to server Error: url="
                 + nettyClient.getUrl().getUri() +  " local="  + localAddress +  " "
                 + MotanFrameworkUtil.toString(request), writeFuture.getCause());
     }  else  {
         throw  new  MotanServiceException( "NettyChannel send request to server Timeout: url="
                 + nettyClient.getUrl().getUri() +  " local="  + localAddress +  " "
                 + MotanFrameworkUtil.toString(request));
     }
}

3.异步的response NettyResponseFuture

public  Object getValue() {
     synchronized  (lock) {
         if  (!isDoing()) {
             return  getValueOrThrowable(); //返回成功值或失败
         }
 
         if  (timeout <=  0 ) {
             try  {
                 lock.wait(); //未接收完毕则一直等待
             }  catch  (Exception e) {
                 cancel( new  MotanServiceException( "NettyResponseFuture getValue InterruptedException : "
                         + MotanFrameworkUtil.toString(request) +  " cost="
                         + (System.currentTimeMillis() - createTime), e));
             }
 
             // don't need to notifylisteners, because onSuccess or
             // onFailure or cancel method already call notifylisteners
             return  getValueOrThrowable();
         }  else  {
             long  waitTime = timeout - (System.currentTimeMillis() - createTime); //等待的时间
 
             if  (waitTime >  0 ) {
                 for  (;;) {
                     try  {
                         lock.wait(waitTime); //要么被通知,要么超时
                     }  catch  (InterruptedException e) {
                     }
 
                     if  (!isDoing()) {
                         break ;
                     }  else  {
                         waitTime = timeout - (System.currentTimeMillis() - createTime);
                         if  (waitTime <=  0 ) {
                             break ;
                         }
                     }
                 }
             }
 
             if  (isDoing()) {
                 timeoutSoCancel();
             }
         }
         return  getValueOrThrowable();
     }
}

本章知识点:

1.motan通过NettyResponseFuture来实现在框架层面异步处理同一笔业务,提升了框架的性能;

2.对于连续失败的client,进行下线操作。

上一篇:七:序列化
下一篇:九:开关

相关文章