本章将分析motan的序列化和底层通信相关部分的代码。
1.在上一章中,有一个getrefers的操作,来获取所有服务器的引用,每个服务器的引用都是由DefaultRpcReferer来创建的
public DefaultRpcReferer(Class<T>clz, URL url, URL serviceUrl) {
super(clz, url, serviceUrl);
endpointFactory =ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));//通过spi加载NettyEndpointFactory
client =endpointFactory.createClient(url);//创建client
}
2.NettyClient的创建过程及源码分析
publicClient createClient(URL url) {
LoggerUtil.info(this.getClass().getSimpleName() + " create client: url={}", url);
returncreateClient(url, heartbeatClientEndpointManager);//创建client
}
privateClient createClient(URL url, EndpointManager endpointManager) {
Client client =innerCreateClient(url);//调用NettyEndpointFactory的创建client的方法
endpointManager.addEndpoint(client);//添加心跳管理
returnclient;
}
protectedClient innerCreateClient(URL url) {
return newNettyClient(url);//返回NettyClient对象
}
publicNettyClient(URL url) {
super(url);
maxClientConnection =url.getIntParameter(URLParamType.maxClientConnection.getName(),
URLParamType.maxClientConnection.getIntValue());
timeMonitorFuture =scheduledExecutor.scheduleWithFixedDelay(
new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" +url.getPort()),
MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD,
TimeUnit.MILLISECONDS);
LoggerUtil.info("client's:"+url.getUri());
}
3.Netty相关的连接建立是通过open()方法进行的
public synchronized boolean open() {
if (isAvailable()) {
return true ;
}
// 初始化netty client bootstrap
initClientBootstrap();
// 初始化连接池
initPool();
LoggerUtil.info( "NettyClient finish Open: url={}" , url);
// 注册统计回调
StatsUtil.registryStatisticCallback( this );
// 设置可用状态
state = ChannelState.ALIVE;
return state.isAliveState();
}
private void initClientBootstrap() {
bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption( "keepAlive" , true );
bootstrap.setOption( "tcpNoDelay" , true );
// 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时,
// 如果为了严格意义的timeout,那么需要应用端进行控制。
int timeout = getUrl().getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
if (timeout <= 0 ) {
throw new MotanFrameworkException( "NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid." ,
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
bootstrap.setOption( "connectTimeoutMillis" , timeout);
// 最大响应包限制
final int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(),
URLParamType.maxContentLength.getIntValue());
bootstrap.setPipelineFactory( new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast( "decoder" , new NettyDecoder(codec, NettyClient. this , maxContentLength)); //解码器
pipeline.addLast( "encoder" , new NettyEncoder(codec, NettyClient. this )); //编码器
pipeline.addLast( "handler" , new NettyChannelHandler(NettyClient. this , new MessageHandler() { //业务处理的handler
@Override
public Object handle(Channel channel, Object message) {
Response response = (Response) message;
NettyResponseFuture responseFuture = NettyClient. this .removeCallback(response.getRequestId()); //移调异步处理response信息
if (responseFuture == null ) {
LoggerUtil.warn(
"NettyClient has response from server, but resonseFuture not exist, requestId={}" ,
response.getRequestId());
return null ;
}
if (response.getException() != null ) {
responseFuture.onFailure(response);
} else {
responseFuture.onSuccess(response);
}
return null ;
}
}));
return pipeline;
}
});
}
4.连接池
protected void initPool() {
poolConfig = new GenericObjectPool.Config(); //使用了GenericObjectPool作为连接池
poolConfig.minIdle =
url.getIntParameter(URLParamType.minClientConnection.getName(), URLParamType.minClientConnection.getIntValue()); //最小连接数,配置中为2个
poolConfig.maxIdle =
url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue()); //最大连接数,配置中为10个
poolConfig.maxActive = poolConfig.maxIdle;
poolConfig.maxWait = url.getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
poolConfig.lifo = url.getBooleanParameter(URLParamType.poolLifo.getName(), URLParamType.poolLifo.getBooleanValue());
poolConfig.minEvictableIdleTimeMillis = defaultMinEvictableIdleTimeMillis;
poolConfig.softMinEvictableIdleTimeMillis = defaultSoftMinEvictableIdleTimeMillis;
poolConfig.timeBetweenEvictionRunsMillis = defaultTimeBetweenEvictionRunsMillis;
factory = createChannelFactory(); //创建chanalfactory
pool = new GenericObjectPool(factory, poolConfig);
boolean lazyInit = url.getBooleanParameter(URLParamType.lazyInit.getName(), URLParamType.lazyInit.getBooleanValue());
if (!lazyInit) {
for ( int i = 0 ; i < poolConfig.minIdle; i++) { //初始化2个长连接
try {
pool.addObject();
LoggerUtil.info( "init client's connection :" +i);
} catch (Exception e) {
LoggerUtil.error( "NettyClient init pool create connect Error: url=" + url.getUri(), e);
}
}
}
}
5.NettyChannelFactory
public class NettyChannelFactory extends BasePoolableObjectFactory {
private String factoryName = "" ;
private NettyClient nettyClient;
public NettyChannelFactory(NettyClient nettyClient) {
super ();
this .nettyClient = nettyClient;
this .factoryName = "NettyChannelFactory_" + nettyClient.getUrl().getHost() + "_"
+ nettyClient.getUrl().getPort();
}
@Override
public Object makeObject() throws Exception { //创建连接时会调用
NettyChannel nettyChannel = new NettyChannel(nettyClient); //创建channel
nettyChannel.open(); //打开channel
return nettyChannel;
}
}
6.NettyChannel
public class NettyChannel implements com.weibo.api.motan.transport.Channel {
private volatile ChannelState state = ChannelState.UNINIT;
private NettyClient nettyClient;
private org.jboss.netty.channel.Channel channel = null ;
private InetSocketAddress remoteAddress = null ;
private InetSocketAddress localAddress = null ;
public NettyChannel(NettyClient nettyClient) {
this .nettyClient = nettyClient;
this .remoteAddress = new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort()); //服务器host和port
}
public synchronized boolean open() { //打开连接
if (isAvailable()) {
LoggerUtil.warn( "the channel already open, local: " + localAddress + " remote: " + remoteAddress + " url: "
+ nettyClient.getUrl().getUri());
return true ;
}
try {
ChannelFuture channleFuture = nettyClient.getBootstrap().connect(
new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort())); //打开连接
long start = System.currentTimeMillis();
int timeout = nettyClient.getUrl().getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
if (timeout <= 0 ) {
throw new MotanFrameworkException( "NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid." ,
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
// 不去依赖于connectTimeout
boolean result = channleFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
boolean success = channleFuture.isSuccess();
if (result && success) {
channel = channleFuture.getChannel();
if (channel.getLocalAddress() != null && channel.getLocalAddress() instanceof InetSocketAddress) {
localAddress = (InetSocketAddress) channel.getLocalAddress();
}
state = ChannelState.ALIVE;
return true ;
}
boolean connected = false ;
if (channleFuture.getChannel() != null ){
connected = channleFuture.getChannel().isConnected();
}
if (channleFuture.getCause() != null ) {
channleFuture.cancel();
throw new MotanServiceException( "NettyChannel failed to connect to server, url: "
+ nettyClient.getUrl().getUri()+ ", result: " + result + ", success: " + success + ", connected: " + connected, channleFuture.getCause());
} else {
channleFuture.cancel();
throw new MotanServiceException( "NettyChannel connect to server timeout url: "
+ nettyClient.getUrl().getUri() + ", cost: " + (System.currentTimeMillis() - start) + ", result: " + result + ", success: " + success + ", connected: " + connected);
}
} catch (MotanServiceException e) {
throw e;
} catch (Exception e) {
throw new MotanServiceException( "NettyChannel failed to connect to server, url: "
+ nettyClient.getUrl().getUri(), e);
} finally {
if (!state.isAliveState()) {
nettyClient.incrErrorCount(); //增加错误次数
}
}
}
}
本章知识点总结:
1.使用netty作为底层通讯框架;
2.每个refer对应一个nettyclient和一个nettychannel,nettychannel是由工厂类创建;
3.每个client在初始化时,最少创建2个长连接,由配置决定;
4.使用了GenericObjectPool来作为连接池;
5.当每个client的连续调用出错数达到阀值时,将自动设置此client为不可用;
6.心跳操作由客户端发起,只针对不可用状态的client。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/a1439226817/article/details/68483467
内容来源于网络,如有侵权,请联系作者删除!