Netty是一个基于Java的高性能的网络框架,基于TCP和UDP,是对Java中NIO的一个封装。
我们可以根据自己的需求快速的开发一款自己需要的应用层协议。
EventLoopGroup
EventLoopGroup 是 EventLoop的一个数组,默认为CPU个数*2,也可以自定义。
EventLoop
一个单线程的Executor,用来绑定一个selector,可以同时绑定多个Channel,读取多个Channel的事件,主要有两种任务:
1、查询selector中监视的channel的事件,并执行对应的handler的方法
2、执行用户的自定义任务。 用户通过channel.eventLoop().execute(Runnable runnable)放入的自定义任务,也可以放入定时任务。
这两个任务是是在一个线程中执行的,通过一个ratio变量来控制二者执行的时间片占比
EventLoop中还有一个executor的属性,这个executor用来执行EventLoop的初始化。如果EventLoop还未初始化,executor会新生成一个Thread来启动EventLoop
当调用EventLoop.execute(Runnable runnable)的时候,如果还没有启动对应的线程,就会利用executor来启动
Channel
用来接受和写入对应的数据,对应于之前的socket。
Handler
一个Channel对应着多个Handler,Handler分为InboundHandler和OutboundHandler两种。
主要方法有read()、write()等方法。
当Channel接收数据的时候,InboundHandler会对其进行处理
当Channel向外发送数据的时候,OutBoundHandler会对其进行处理
Pipeline
一个Channel对应着有一个Pipeline,Pipeline是ChannelHandler的一个链表。
当接收数据的时候,Pipeline会从head 到 tail 依次执行对应InboundHanlder的方法
当发送数据的时候,Pipeline会从tail 到 tail 依次执行对应的OutBoundHandler的方法
Netty的线程模型也是基于Reactor的,如果不了解Reactor模式的,可以看我之前写的文章Reactor设计模式
Netty中分为ServerBootstrap 和 Bootstrap。
ServerBootstrap 是服务器创建的引导类。
其中需要传入两个EventLoopGroup,分别为parentGroup和childGroup。
parentGroup 用来 处理 新channel的连接,childGroup 用来 处理 channel的读取等操作
下面说一下EventLoopEvent的创建流程
如上图所示,executor被初始化成一个ThreadPerTaskExecutor, execute()的时候都会新开一个线程执行对应的runnable。
我们注意下children这个变量,就是对应的EventLoop数组,
对应的newChild(executor,args)就是生成一个EventLoop对象。
EventLoopGroup的核心就是EventLoop数组。
EventLoop对应着一个线程和selector,可以监听多个channel,ServerChannel通过轮询的方式将新连接的channel分配给EventLoop数组中的EventLoop
再说下init(Channel )
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//这里,加入了对应的新建连接对应的处理handler
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
ServerBootstrapAcceptor 对应的 channelRead()如下
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//加入对应的hanlder,也是我们初始化的时候定义的
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//重点,将channel注册到childGroup中
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
next()就是查找childGroup的下一个EventLoop,也就是轮询的使用EventLoop
然后转到对应的EventLoop的register()方法
到这步熟悉了吧,利用unsafe()将channel和对应的eventLoop的Selector绑定在一起。
Netty中常用的就是NioEventLoop,我们看下它的run()方法
protected void run() {
for (;;) {
try {
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
这里我只列出了核心代码,可以看出EventLoop开了一个死循环,做了两个事情
1、processSelectedKeys()
由上图所示,processSelectedKeys()主要是用来处理selector监听的channel。
如上图所示,通过unsafe.read()来读取对应的数据
看上图,来到了我们熟悉的pipeline
executor.isEventLoop()是判断当前线程是否是EventLoop绑定的线程,如果是就直接执行invokeChannelRead(m).
如果不是,就将其包装为runnable加入到 taskQueue中。
因为read()只能由EventLoop线程发起,所以会直接执行invokeChannelRead(m).
如果handler对应的处理过程比较长,建议在handler中使用线程池来执行对应的任务,否则会影响其他连接的业务处理
如上图所示,调用了hanlder().channelRead(this,msg),这就是调用了对应的handler。
但是并未传递到后面的handler,当想要将对应的数据传递到后续的handler的时候,需要用户手动的调用
ctx.fireChannelRead(msg);
2、runAllTasks()
在InboundHandler和OutboundHandler的对应的回调方法中,暴露了ChannelHandlerContext,用户可以通过
ctx.channel().eventLoop().execute().execute(Runnable runnable)
来提交对应的任务,对应的任务会加入到EventLoop的taskQueue中。
客户端的Bootstrap是ServerBootstrap的简化版本,只有一个childGroup(), 也是初始化对应的channel,然后绑定到一个EventLoop中。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_40276626/article/details/121453415
内容来源于网络,如有侵权,请联系作者删除!