motan的序列化支持两种协议,一种是json,另一种是hessian2。主要涉及到的类和接口是是:FastJsonSerialization、Hessian2Serialization、Serialization、Codec、AbstractCodec、NettyDecoder、NettyEncoder、DefaultRpcCodec和CompressRpcCodec等。
1.FastJsonSerialization使用json作为数据交换协议,Hessian2Serialization使用hessian2作为数据交换协议
@SpiMeta(name = "hessian2")
public class Hessian2Serialization implementsSerialization {
@Override
public byte[] serialize(Object data) throwsIOException {//使用hessan2进行序列化
ByteArrayOutputStream bos = newByteArrayOutputStream();
Hessian2Output out = newHessian2Output(bos);
out.writeObject(data);
out.flush();
returnbos.toByteArray();
}
@SuppressWarnings("unchecked")
@Override
public <T> T deserialize(byte[] data, Class<T> clz) throwsIOException {//使用hessan2进行反序列化
Hessian2Input input = new Hessian2Input(newByteArrayInputStream(data));
return(T) input.readObject(clz);
}
}
2.motan支持压缩和非压缩两种方式
public byte[] encode(Channel channel, Object message) throwsIOException {
if(needEncodeV1(message)) {//判断使用哪个版本的encode,decode同样
returnv1Codec.encode(channel, message);
} else{
//使用v2压缩版本
returnencodeV2(channel, message);//使用压缩版本的encode
}
}
public byte[] encodeV2(Channel channel, Object message) throwsIOException {
try{
if (message instanceofRequest) {
returnencodeRequest(channel, (Request) message);//序列化并压缩request对象
} else if (message instanceofResponse) {
returnencodeResponse(channel, (Response) message);//序列化并压缩response对象
}
} catch(Exception e) {
if(ExceptionUtil.isMotanException(e)) {
throw(RuntimeException) e;
} else{
throw new MotanFrameworkException("encode error: isResponse=" + (message instanceofResponse), e,
MotanErrorMsgConstant.FRAMEWORK_ENCODE_ERROR);
}
}
throw new MotanFrameworkException("encode error: message type not support, " +message.getClass(),
MotanErrorMsgConstant.FRAMEWORK_ENCODE_ERROR);
}
private byte[] encodeRequest(Channel channel, Request request) throwsIOException {
ByteArrayOutputStream outputStream = newByteArrayOutputStream();
ObjectOutput output =createOutput(outputStream);
addMethodInfo(output, request);
//拿到hessian2或fasjson处理对象
Serialization serialization =ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
channel.getUrl().getParameter(URLParamType.serialize.getName(), URLParamType.serialize.getValue()));
if (request.getArguments() != null && request.getArguments().length > 0) {
for(Object obj : request.getArguments()) {
serialize(output, obj, serialization);//序列化
}
}
if (request.getAttachments() == null ||request.getAttachments().isEmpty()) {
//empty attachments
output.writeShort(0);
} else{
//需要copy一份attachment进行签名替换,这样在失败重试时原始的request信息不会变更
Map<String, String> attachments =copyMap(request.getAttachments());
replaceAttachmentParamsBySign(channel, attachments);
addAttachment(output, attachments);
}
output.flush();
byte[] body =outputStream.toByteArray();
byte flag =MotanConstants.FLAG_REQUEST;
output.close();
Boolean usegz =channel.getUrl().getBooleanParameter(URLParamType.usegz.getName(), URLParamType.usegz.getBooleanValue());
int minGzSize =channel.getUrl().getIntParameter(URLParamType.mingzSize.getName(), URLParamType.mingzSize.getIntValue());
returnencode(compress(body, usegz, minGzSize), flag, request.getRequestId());//压缩处理
}
3.通过NettyDecoder和NettyEncoder与netty框架进行结合
protected Object encode(ChannelHandlerContext ctx, Channel nettyChannel, Object message) throws Exception {
long requestId = getRequestId(message); //获取requestId
byte [] data = null ;
if (message instanceof Response) {
try {
data = codec.encode(client, message);
} catch (Exception e) {
LoggerUtil.error( "NettyEncoder encode error, identity=" + client.getUrl().getIdentity(), e);
Response response = buildExceptionResponse(requestId, e);
data = codec.encode(client, response);
}
} else {
data = codec.encode(client, message); //调用DefaultRpcCodec或压缩的codec来编码
}
byte [] transportHeader = new byte [MotanConstants.NETTY_HEADER];
ByteUtil.short2bytes(MotanConstants.NETTY_MAGIC_TYPE, transportHeader, 0 );
transportHeader[ 3 ] = getType(message);
ByteUtil.long2bytes(getRequestId(message), transportHeader, 4 );
ByteUtil.int2bytes(data.length, transportHeader, 12 );
return ChannelBuffers.wrappedBuffer(transportHeader, data);
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/a1439226817/article/details/68483472
内容来源于网络,如有侵权,请联系作者删除!