文章13 | 阅读 5625 | 点赞0
ChannelPipeline是Java拦截器设计模式的一种高级实现方式,在pipeline中通过定义一系列ChannelHandler来处理或拦截Channel中数据的输入输出操作,使得用户可以通过ChannelHandler,定义如何对Channel收到或写出的数据进行处理,以及定义这些ChannelHandler之间的交互合作、对数据的处理顺序。
即由SocketChannel#read(ByteBuffer)的调用开始,读取到对方传递过来的数据,在read中调用ChannelPipeline开始数据的处理。
ChannelPipeline调用ChannelHandlerContext的fireChannelRead(Object)方法(或者其他fireChannelXXX,如下文中的列表)将数据传递到下一个数据输入处理器ChannelInboundHandler,具体为ChannelInboundHandler的channelRead方法(或者其他的channelXXX)。在ChannelInboundHandler的channelRead方法中进行数据处理。
ChannelInboundHandler的channelRead处理完数据之后:
往下传输数据:调用ChannelHandlerContext的fireChannelRead(Object),如ctx.fireChannelRead(msg),将数据交给ChannelInboundHandler链的下一个ChannelInboundHandler,往下传输数据;
释放数据:调用ReferenceCountUtil.release(msg)主动释放数据或者ChannelHandler实现类继承SimpleChannelInboundHandler自动释放数据,不再往下传输,具体在ChannelHandler的文章继续分析。
即由ChannelHandlerContext的write(Object)的调用,写数据传递给对方时。
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
例子:如下如果是输入IO,则依次调用1,2,5,输出IO,则依次调用5,4,3
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
ChannelHandlerContext#fireChannelRegistered()
ChannelHandlerContext#fireChannelActive()
ChannelHandlerContext#fireChannelRead(Object)
ChannelHandlerContext#fireChannelReadComplete()
ChannelHandlerContext#fireExceptionCaught(Throwable)
ChannelHandlerContext#fireUserEventTriggered(Object)
ChannelHandlerContext#fireChannelWritabilityChanged()
ChannelHandlerContext#fireChannelInactive()
ChannelHandlerContext#fireChannelUnregistered()
例子:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive({@link ChannelHandlerContext} ctx) {
System.out.println("Connected!");
// 传给下一个ChannelInboundHandler
ctx.fireChannelActive();
}
}
ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext#write(Object, ChannelPromise)
ChannelHandlerContext#flush()
ChannelHandlerContext#read()
ChannelHandlerContext#disconnect(ChannelPromise)
ChannelHandlerContext#close(ChannelPromise)
ChannelHandlerContext#deregister(ChannelPromise)
例子:
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
// 传给下一个ChannelOutboundHandler
ctx.close(promise);
}
}
正常情况下,Channel的一次数据IO操作,都是在其所绑定的eventLoop所在的IO线程处理的,如果某个ChannelHandler的处理时间很长,则可以为在添加这个ChannelHandler到pipeline时,指定一个线程池,让这个ChannelHandler在一个额外的线程,而不是eventLoop的线程,这样就不会阻塞eventLoop线程,不会影响到该eventLoop管理的其他Channel的数据IO操作。
// 可以在ChannelInitializer的实现类中,定义一个static的线程池,由所有Channel共享
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
// 在initChannel方法为每个新建的Channel的pipeline创建ChannelHandler实例
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://xieyizun.blog.csdn.net/article/details/85158858
内容来源于网络,如有侵权,请联系作者删除!