之前我们分析了客户端调用服务端的源码,但是没有涉及到通讯层和序列化层,本文将之前讲过的内容做一次串联。
1.上层通过动态代理调用refer的call,每个refer又对应一个nettyclient,下面来看一下nettyclient的调用服务端操作
private Response request(Request request, boolean async) throws TransportException {
Channel channel = null ;
Response response = null ;
try {
// return channel or throw exception(timeout or connection_fail)
channel = borrowObject(); //向连接池拿连接
if (channel == null ) {
LoggerUtil.error( "NettyClient borrowObject null: url=" + url.getUri() + " "
+ MotanFrameworkUtil.toString(request));
return null ;
}
// async request
response = channel.request(request); //调用channel的request
// return channel to pool
returnObject(channel); //归还连接
} catch (Exception e) {
LoggerUtil.error(
"NettyClient request Error: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request), e);
//TODO 对特定的异常回收channel
invalidateObject(channel); //销毁坏的连接
if (e instanceof MotanAbstractException) {
throw (MotanAbstractException) e;
} else {
throw new MotanServiceException( "NettyClient request Error: url=" + url.getUri() + " "
+ MotanFrameworkUtil.toString(request), e);
}
}
// aysnc or sync result
response = asyncResponse(response, async); //处理response
return response;
}
2.nettychannel的request操作
public Response request(Request request) throws TransportException {
int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(),
URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
if (timeout <= 0 ) {
throw new MotanFrameworkException( "NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid." ,
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
NettyResponseFuture response = new NettyResponseFuture(request, timeout, this .nettyClient); //创建异步response对象
this .nettyClient.registerCallback(request.getRequestId(), response); //将此response存入到map,处理完后,会移出
ChannelFuture writeFuture = this .channel.write(request); //向服务端传递request对象,写之前会进行序列化的操作
boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS); //标识是否成功
if (result && writeFuture.isSuccess()) {
response.addListener( new FutureListener() { //增加response的监听器
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
// 成功的调用
nettyClient.resetErrorCount(); //成功
} else {
// 失败的调用
nettyClient.incrErrorCount(); //对失败次数+1,如果同一个client连续失败达到所有的连接次数时,标识此client不可用,由心跳管理器负责恢复此client的可用状态
}
}
});
return response; //返回此response,此response为异步的response,由业务线程接手后续接收的过程
}
writeFuture.cancel();
response = this .nettyClient.removeCallback(request.getRequestId()); //在map中移出此response
if (response != null ) {
response.cancel();
}
// 失败的调用
nettyClient.incrErrorCount();
if (writeFuture.getCause() != null ) {
throw new MotanServiceException( "NettyChannel send request to server Error: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request), writeFuture.getCause());
} else {
throw new MotanServiceException( "NettyChannel send request to server Timeout: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request));
}
}
3.异步的response NettyResponseFuture
public Object getValue() {
synchronized (lock) {
if (!isDoing()) {
return getValueOrThrowable(); //返回成功值或失败
}
if (timeout <= 0 ) {
try {
lock.wait(); //未接收完毕则一直等待
} catch (Exception e) {
cancel( new MotanServiceException( "NettyResponseFuture getValue InterruptedException : "
+ MotanFrameworkUtil.toString(request) + " cost="
+ (System.currentTimeMillis() - createTime), e));
}
// don't need to notifylisteners, because onSuccess or
// onFailure or cancel method already call notifylisteners
return getValueOrThrowable();
} else {
long waitTime = timeout - (System.currentTimeMillis() - createTime); //等待的时间
if (waitTime > 0 ) {
for (;;) {
try {
lock.wait(waitTime); //要么被通知,要么超时
} catch (InterruptedException e) {
}
if (!isDoing()) {
break ;
} else {
waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime <= 0 ) {
break ;
}
}
}
}
if (isDoing()) {
timeoutSoCancel();
}
}
return getValueOrThrowable();
}
}
本章知识点:
1.motan通过NettyResponseFuture来实现在框架层面异步处理同一笔业务,提升了框架的性能;
2.对于连续失败的client,进行下线操作。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/a1439226817/article/details/68483481
内容来源于网络,如有侵权,请联系作者删除!