1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。
2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。
3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。
1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态
2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数
3) 设定阈值,达到阈值时主动关闭连接
public class HreatBeatServer {
public static void main(String[] args) {
//创建两个Reactor 构建主从 Reactor 模型
//用于处理 连接和读写事件 , 无限循环组(线程池)
//管理 channel 监听事件
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 我们需要一个服务端引导程序来开启服务端。
ServerBootstrap serverBootstrap = new ServerBootstrap();
//将主从 Reactor 入参,设置当前参数
//这个方法返回的事对象本身,我们可以点出其他方法, 这种返回类型为对象自身 提供了 链式编程的方式
serverBootstrap.group(bossGroup, workerGroup)
//我们需要设置 channel 的 类型
//对应的是 netty NIO BIO
//NioServerSocketChannel <== ServerSocketChannel <== ServerSocket
.channel(NioServerSocketChannel.class)
//设置当前通道的处理器,使用Netty提供的日志打印处理器
.handler(new LoggingHandler(LogLevel.INFO))
//定义客户端连接处理的使用
//此方法需要设置参数 ChannelInitializer 通道初始化器
//初始化 要处理客户端 通道, 所以泛型设置为 SocketChannel
//此类 为抽象类 需要实现其抽象方法 initchannel (alt+enter)快捷键
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//通过channel 获取管道 pipeline
// 通道代表我们连接的角色, 管道代表处理业务得逻辑管理
// 管道相当于 链表,可以将不同的处理器连接起来,管理处理器的顺序
// 使用时 常常使用的事尾插法, addList 将加入到尾部
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
/* * 使用心跳检测处理器 * 读空闲 写空闲 读写空闲 的超时时间 * 最后一个参数是 时间的单位 * IdleStateHandler发现有空闲的时候 会触发 IdleStateEvent时间 * 他会把事件推送给下一个 handler的指定方法 userEventTriggered 去处理 * */
pipeline.addLast(new IdleStateHandler(5, 10, 20, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(new HreatBeatServerHandler());
}
});
System.out.println("服务端初始化完成");
// 设置并启动端口号,但需要使用sync 异步启动
try {
ChannelFuture future = serverBootstrap.bind(2020).sync();
// 将关闭通道的方式也设置为异步的
// 阻塞Finally中的代码执行
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅方式都关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
IdleStateHandler , 是netty提供的处理器
1)超过多长时间没有读 readerIdleTime
2) 超过多长时间没有写 writerIdleTime
3) 超过多长时间没有读和写 allIdleTime
底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered。
public class HreatBeatServerHandler extends SimpleChannelInboundHandler<String> {
private int times;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if ("I am alive".equals(msg)) {
ctx.writeAndFlush(Unpooled.copiedBuffer("over", CharsetUtil.UTF_8));
}
}
//处理心跳检测事件的方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventDesc = null;
switch (event.state()) {
case READER_IDLE:
eventDesc = "读空闲";
break;
case WRITER_IDLE:
eventDesc = "写空闲";
break;
case ALL_IDLE:
eventDesc = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "发生超时事件--" + eventDesc);
times++;
if (times > 3) {
System.out.println("空闲次数超过三次 关闭连接");
ctx.writeAndFlush("you are out");
ctx.channel().close();
}
//super.userEventTriggered(ctx, evt);
}
}
其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类
客户端不断循环给服务端发消息确认存活的期间 线程睡眠 模拟失去心跳场景
package com.hyc.netty.Hreatbeat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Random;
public class HreatbeatClient {
public static void main(String[] args) {
//客户端只需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
// 客户端启动的对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
socketChannel.pipeline().addLast(new HreatbeatClientHandler());
}
});
System.out.println("客户端初始化完成");
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 2020).sync();
String data = "I am alive";
while (future.channel().isActive()) {
//模拟空闲状态
int num = new Random().nextInt(10);
Thread.sleep(num * 1000);
future.channel().writeAndFlush(data);
}
//future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
static class HreatbeatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("server data:" + s);
if ("you are out".equals(s)) {
System.out.println("关闭");
channelHandlerContext.channel().close();
}
}
}
}
客户端随机线程睡眠 一旦接受到 服务端返回的you are out
代表空闲次数超过了 3次 则关闭客户端连接
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/doomwatcher/article/details/121639687
内容来源于网络,如有侵权,请联系作者删除!