文章13 | 阅读 5628 | 点赞0
定义对Channel中IO数据的处理逻辑,主要是面向业务逻辑的处理,只需按业务需要的数据处理顺序,通过调用ChannelPipeline的addLast等方法,将多个ChannelHandler添加到ChannelPipeline中即可,无需关心多个ChannelHandler在ChannelPipeline中如何联系起来的,这个工作由ChannelHandlerContext完成。
实现Netty内部各组件,如ChannelHandler、ChannelPipeline,合作完成数据处理的基础,核心功能是实现ChannelHandler和ChannelPipeline以及其他ChannelHandler之间通信、交互。在ChannelPipeline中,每个ChannelHandler对象实例,对应一个ChannelHandlerContext,由该ChannelHandlerContext代表ChannelHandler或者说ChannelHandler委托ChannelHandlerContext,在ChannelPipeline中对Channel的数据进行处理(实际底层仍旧是使用ChannelHandler进行数据处理),以及与其他ChannelHandler的交互通信。具体表现为:
(1)ChannelHandlers之间的交互:调用内部关联的ChannelHandler实例处理数据,或者将数据交给ChannelPipeline中的下一个ChannelHandler处理;
(2)在运行时,可以按需要动态对ChannelPipeline进行修改,如添加、删除ChannelHandler。
故在ChannelHandlerContext需要包含ChannelHandler和ChannelPipeline的引用:
同时维护当前ChannelHandler在ChannelPipeline中相邻的,即pre(上一个)和next(下一个)的ChannelHandler的引用,而pre和next的赋值维护是由ChannelPipeline完成的(在addLast等方法维护):
由于ChannelHandler通常在每个方法中使用ChannelHandlerContext和需要处理的数据msg作为参数,即使用局部方法保证线程安全,如图:
同时添加到ChannelPipeline中就会创建一个独立ChannelHandlerContext实例与该ChannelHandler关联,故同一个ChannelHandler实例可以添加到多个ChannelPipeline或者在一个ChannelPipeline中添加多次,而涉及到线程安全的有状态数据则使用ChannelHandlerContext的attr()方法结合AttributeKey来实现ChannelHandler在多个ChannelPipeline中互不影响。具体看下面的详细分析。
上图第三个红框的实现如下图:获取下一个outbound,然后调用invokeWriteAndFlush或invokeWrite,这两个方法在上图实现的,如invokeWrite:如果channelHandlerContext添加过channelHandler,则调用invokeWrite0方法,将channelHandler强制转换为ChannelOutboundHandler,然后执行write,这个write的逻辑是用户自定义了的(在write中,用户可以定义直接写到底层socket,如调用ctx.channel().write(msg),或者调用,如ctx.write(msg),传给下一个ChannelOutboundHandler继续处理,这也是调用Channel的write和ChannelHandlerContext的write的区别);
否则传给下一个ChannelOutboundHandler处理。
FactorialHandler fh = new FactorialHandler();
// 添加到一个ChannelPipeline多次
ChannelPipeline p1 = Channels.pipeline();
p1.addLast("f1", fh);
p1.addLast("f2", fh);
// 添加到多个ChannelPipeline,即p1, p2
ChannelPipeline p2 = Channels.pipeline();
p2.addLast("f3", fh);
p2.addLast("f4", fh);
状态数据的管理
以同一个ChannelHandler添加到多个ChannelPipeline为例分析:
由于多个ChannelPipeline或者说多个Channel,共享同一个ChannelHandler实例,由该ChannelHandler处理这些不同Channel中流通的数据,故如果在处理过程中需要记录与每个Channel特定的数据,多个Channel互相独立,即维护有状态的数据,则需要使用AttributeKey指定状态数据的key和value的类型,使用ChannelHandlerContext的attr方法,将该key对应的数据存到ChannelHandlerContext,因为ChannelHandlerContext是每次(添加到同一个ChannelPipeline多次或者添加到多个ChannelPipeline)添加到ChannelPipeline中都会创建一个独立的实例的。
示例:该ChannelHandler对应上面FactorialHandler
public class FactorialHandler extends ChannelInboundHandlerAdapter {
// 有状态的数据counter
private final AttributeKey<Integer> counter = AttributeKey.valueOf("counter");
// This handler will receive a sequence of increasing integers starting
// from 1.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 获取当前ChannelHandlerContext特定的counter
Integer a = ctx.attr(counter).get();
if (a == null) {
a = 1;
}
attr.set(a * (Integer) msg);
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://xieyizun.blog.csdn.net/article/details/85212804
内容来源于网络,如有侵权,请联系作者删除!