motan源码分析六:客户端与服务器的通信层分析

x33g5p2x  于2021-12-21 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(330)

本章将分析motan的序列化和底层通信相关部分的代码。

1.在上一章中,有一个getrefers的操作,来获取所有服务器的引用,每个服务器的引用都是由DefaultRpcReferer来创建的

public DefaultRpcReferer(Class<T>clz, URL url, URL serviceUrl) {
            super(clz, url, serviceUrl);

            endpointFactory =ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
                            url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));//通过spi加载NettyEndpointFactory

            client =endpointFactory.createClient(url);//创建client
        }

2.NettyClient的创建过程及源码分析

publicClient createClient(URL url) {
        LoggerUtil.info(this.getClass().getSimpleName() + " create client: url={}", url);
        returncreateClient(url, heartbeatClientEndpointManager);//创建client
    }

    privateClient createClient(URL url, EndpointManager endpointManager) {
        Client client =innerCreateClient(url);//调用NettyEndpointFactory的创建client的方法

        endpointManager.addEndpoint(client);//添加心跳管理

        returnclient;
    }

    protectedClient innerCreateClient(URL url) {
        return newNettyClient(url);//返回NettyClient对象
    }

    publicNettyClient(URL url) {
        super(url);

        maxClientConnection =url.getIntParameter(URLParamType.maxClientConnection.getName(),
                URLParamType.maxClientConnection.getIntValue());

        timeMonitorFuture =scheduledExecutor.scheduleWithFixedDelay(
                new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" +url.getPort()),
                MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD,
                TimeUnit.MILLISECONDS);
        LoggerUtil.info("client's:"+url.getUri());
    }

3.Netty相关的连接建立是通过open()方法进行的

public  synchronized  boolean  open() {
     if  (isAvailable()) {
         return  true ;
     }
 
     // 初始化netty client bootstrap
     initClientBootstrap();
 
     // 初始化连接池
     initPool();
 
     LoggerUtil.info( "NettyClient finish Open: url={}" , url);
 
     // 注册统计回调
     StatsUtil.registryStatisticCallback( this );
 
     // 设置可用状态
     state = ChannelState.ALIVE;
     return  state.isAliveState();
}
private  void  initClientBootstrap() {
     bootstrap =  new  ClientBootstrap(channelFactory);
     
     bootstrap.setOption( "keepAlive" ,  true );
     bootstrap.setOption( "tcpNoDelay" ,  true );
 
     // 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时,
     // 如果为了严格意义的timeout,那么需要应用端进行控制。
     int  timeout = getUrl().getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
     if  (timeout <=  0 ) {
         throw  new  MotanFrameworkException( "NettyClient init Error: timeout("  + timeout +  ") <= 0 is forbid." ,
                 MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
     }
     bootstrap.setOption( "connectTimeoutMillis" , timeout);
 
     // 最大响应包限制
     final  int  maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(),
             URLParamType.maxContentLength.getIntValue());
 
     bootstrap.setPipelineFactory( new  ChannelPipelineFactory() {
         public  ChannelPipeline getPipeline() {
             ChannelPipeline pipeline = Channels.pipeline();
             pipeline.addLast( "decoder" ,  new  NettyDecoder(codec, NettyClient. this , maxContentLength)); //解码器
             pipeline.addLast( "encoder" ,  new  NettyEncoder(codec, NettyClient. this )); //编码器
             pipeline.addLast( "handler" ,  new  NettyChannelHandler(NettyClient. this ,  new  MessageHandler() { //业务处理的handler
                 @Override
                 public  Object handle(Channel channel, Object message) {
                     Response response = (Response) message;
 
                     NettyResponseFuture responseFuture = NettyClient. this .removeCallback(response.getRequestId()); //移调异步处理response信息
 
                     if  (responseFuture ==  null ) {
                         LoggerUtil.warn(
                                 "NettyClient has response from server, but resonseFuture not exist,  requestId={}" ,
                                 response.getRequestId());
                         return  null ;
                     }
 
                     if  (response.getException() !=  null ) {
                         responseFuture.onFailure(response);
                     }  else  {
                         responseFuture.onSuccess(response);
                     }
 
                     return  null ;
                 }
             }));
             return  pipeline;
         }
     });
}

4.连接池

protected  void  initPool() {
     poolConfig =  new  GenericObjectPool.Config(); //使用了GenericObjectPool作为连接池
     poolConfig.minIdle =
             url.getIntParameter(URLParamType.minClientConnection.getName(), URLParamType.minClientConnection.getIntValue()); //最小连接数,配置中为2个
     poolConfig.maxIdle =
             url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue()); //最大连接数,配置中为10个
     poolConfig.maxActive = poolConfig.maxIdle;
     poolConfig.maxWait = url.getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
     poolConfig.lifo = url.getBooleanParameter(URLParamType.poolLifo.getName(), URLParamType.poolLifo.getBooleanValue());
     poolConfig.minEvictableIdleTimeMillis = defaultMinEvictableIdleTimeMillis;
     poolConfig.softMinEvictableIdleTimeMillis = defaultSoftMinEvictableIdleTimeMillis;
     poolConfig.timeBetweenEvictionRunsMillis = defaultTimeBetweenEvictionRunsMillis;
     factory = createChannelFactory(); //创建chanalfactory
 
     pool =  new  GenericObjectPool(factory, poolConfig);
 
     boolean  lazyInit = url.getBooleanParameter(URLParamType.lazyInit.getName(), URLParamType.lazyInit.getBooleanValue());
 
     if  (!lazyInit) {
         for  ( int  i =  0 ; i < poolConfig.minIdle; i++) { //初始化2个长连接
             try  {
                 pool.addObject();
                 LoggerUtil.info( "init client's connection :" +i);
             }  catch  (Exception e) {
                 LoggerUtil.error( "NettyClient init pool create connect Error: url="  + url.getUri(), e);
             }
         }
     }
}

5.NettyChannelFactory

public  class  NettyChannelFactory  extends  BasePoolableObjectFactory {
     private  String factoryName =  "" ;
     private  NettyClient nettyClient;
 
     public  NettyChannelFactory(NettyClient nettyClient) {
         super ();
 
         this .nettyClient = nettyClient;
         this .factoryName =  "NettyChannelFactory_"  + nettyClient.getUrl().getHost() +  "_"
                 + nettyClient.getUrl().getPort();
     }
 
 
     @Override
     public  Object makeObject()  throws  Exception { //创建连接时会调用
         NettyChannel nettyChannel =  new  NettyChannel(nettyClient); //创建channel
         nettyChannel.open(); //打开channel
 
         return  nettyChannel;
     }
 
}

6.NettyChannel

public  class  NettyChannel  implements  com.weibo.api.motan.transport.Channel {
     private  volatile  ChannelState state = ChannelState.UNINIT;
 
     private  NettyClient nettyClient;
 
     private  org.jboss.netty.channel.Channel channel =  null ;
 
     private  InetSocketAddress remoteAddress =  null ;
     private  InetSocketAddress localAddress =  null ;
 
     public  NettyChannel(NettyClient nettyClient) {
         this .nettyClient = nettyClient;
         this .remoteAddress =  new  InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort()); //服务器host和port
     }
     public  synchronized  boolean  open() { //打开连接
         if  (isAvailable()) {
             LoggerUtil.warn( "the channel already open, local: "  + localAddress +  " remote: "  + remoteAddress +  " url: "
                     + nettyClient.getUrl().getUri());
             return  true ;
         }
 
         try  {
             ChannelFuture channleFuture = nettyClient.getBootstrap().connect(
                     new  InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort())); //打开连接
 
             long  start = System.currentTimeMillis();
 
             int  timeout = nettyClient.getUrl().getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
             if  (timeout <=  0 ) {
                 throw  new  MotanFrameworkException( "NettyClient init Error: timeout("  + timeout +  ") <= 0 is forbid." ,
                         MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
             }
             // 不去依赖于connectTimeout
             boolean  result = channleFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
             boolean  success = channleFuture.isSuccess();
 
             if  (result && success) {
                 channel = channleFuture.getChannel();
                 if  (channel.getLocalAddress() !=  null  && channel.getLocalAddress()  instanceof  InetSocketAddress) {
                     localAddress = (InetSocketAddress) channel.getLocalAddress();
                 }
 
                 state = ChannelState.ALIVE;
                 return  true ;
             }
             boolean  connected =  false ;
             if (channleFuture.getChannel() !=  null ){
                 connected = channleFuture.getChannel().isConnected();
             }
 
             if  (channleFuture.getCause() !=  null ) {
                 channleFuture.cancel();
                 throw  new  MotanServiceException( "NettyChannel failed to connect to server, url: "
                         + nettyClient.getUrl().getUri()+  ", result: "  + result +  ", success: "  + success +  ", connected: "  + connected, channleFuture.getCause());
             }  else  {
                 channleFuture.cancel();
                 throw  new  MotanServiceException( "NettyChannel connect to server timeout url: "
                         + nettyClient.getUrl().getUri() +  ", cost: "  + (System.currentTimeMillis() - start) +  ", result: "  + result +  ", success: "  + success +  ", connected: "  + connected);
             }
         }  catch  (MotanServiceException e) {
             throw  e;
         }  catch  (Exception e) {
             throw  new  MotanServiceException( "NettyChannel failed to connect to server, url: "
                     + nettyClient.getUrl().getUri(), e);
         }  finally  {
             if  (!state.isAliveState()) {
                 nettyClient.incrErrorCount(); //增加错误次数
             }
         }
     }
}

本章知识点总结:

1.使用netty作为底层通讯框架;

2.每个refer对应一个nettyclient和一个nettychannel,nettychannel是由工厂类创建;

3.每个client在初始化时,最少创建2个长连接,由配置决定;

4.使用了GenericObjectPool来作为连接池;

5.当每个client的连续调用出错数达到阀值时,将自动设置此client为不可用;

6.心跳操作由客户端发起,只针对不可用状态的client。

下一篇:七:序列化

相关文章