Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty是基于nio的,它封装了jdk的nio,让我们使用起来更加方法灵活。
两张图让你了解BIO和NIO的区别
传输快是因为,netty的读写都是在堆内存上进行的,netty接收数据时,它会在堆内存之外开辟一块内存,
数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。
①Channel
如上图所示:
②ChannelPipeline和ChannelHandler
ChannelPipeline和ChannelHandler用于channel事件的拦截和处理,Netty使用类似责任链的模式来设计ChannelPipeline和ChannelHandler ChannelPipeline相当于ChannelHandler的容器,channel事件消息在ChannelPipeline中流动和传播,相应的事件能够被ChannelHandler拦截处理、传递、忽略或者终止,如下图所示:
③ChannelHandler
ChannelHandler负责I/O事件或者I/O操作进行拦截和处理,用户可以通过ChannelHandlerAdapter来选择性的实现自己感兴趣的事件拦截和处理。
方法 | 描述 |
---|---|
handlerAdded | 握手建立(Handler被加入Pipeline时触发,仅触发一次) |
channelRegistered | 分配连接线程(channel成功注册到EventLoopGroup的EventLoop) |
channelActive | 通道活跃,可以使用(连接就绪) |
channelRead | 有数据可读时触发 |
channelReadComplete | 有数据可读,并且读完时触发 |
channelInactive | 客户端主动断开连接时触发,底层TCP连接断开,通道不活跃 |
channelUnregistered | 连接关闭后,释放绑定的EventLoop线程 |
handlerRemoved | 握手取消(handler被Pipeline移除时触发) |
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<!--不用加版本号-->
</dependency>
@Component
public class NettybootServerInitConfig implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if(event.getApplicationContext().getParent() == null){
WssServer.getInstance().start();
}
}
}
@Component
public class WssServer {
/**
* 单例静态内部类
*/
public static class SingletionWSServer {
static final WssServer instance = new WssServer();
}
public static WssServer getInstance() {
return SingletionWSServer.instance;
}
private EventLoopGroup mainGroup;
private EventLoopGroup subGroup;
private ServerBootstrap server;
private ChannelFuture future;
public WssServer() {
mainGroup = new NioEventLoopGroup();
subGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(mainGroup, subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WssServerInitialzer()); // 添加自定义初始化处理器
}
public void start() {
//端口号不要和spring的端口一样
future = this.server.bind(8086);
System.err.println("netty 服务端启动完毕 .....");
}
}
public class WssServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast(new IdleStateHandler(3,4,5, TimeUnit.SECONDS));
//websocket基于http协议,所以需要http编解码器
pipeline.addLast(new HttpServerCodec());
//添加对于读写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
//对httpMessage进行聚合
pipeline.addLast(new HttpObjectAggregator(1024*64));
// ================= 上述是用于支持http协议的 ==============
//websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
//比如处理一些握手动作(ping,pong)
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//自定义handler
pipeline.addLast(new ChatHandler());
}
}
@ChannelHandler.Sharable
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//用于记录和管理所有客户端的channel
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private WebSocketServerHandshaker handshaker;
/*
* 握手建立
* 客户端创建的时候触发,当客户端连接上服务端之后,就可以获取该channel,然后放到channelGroup中进行统一管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("握手建立");
Channel incoming = ctx.channel();
clients.add(incoming);
}
/*
* channelAction
* channel 通道 action 活跃的
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。
*/
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().localAddress().toString() + " 通道活跃");
}
/*
* 功能:读取 h5页面发送过来的信息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
throws Exception {
if (msg instanceof FullHttpRequest) {// 如果是HTTP请求,进行HTTP操作
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {// 如果是Websocket请求,则进行websocket操作
System.out.println("websocket接收到消息:"+msg);
handleWebSocketFrame(ctx, msg);
}
}
/*
* channelInactive
* channel 通道 Inactive 不活跃的
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。
*/
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!");
}
/*
* 功能:读空闲时移除Channel
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent evnet = (IdleStateEvent) evt;
// 判断Channel是否读空闲, 读空闲时移除Channel
if (evnet.state().equals(IdleState.READER_IDLE)) {
System.out.println("读空闲移除Channel");
UserInfoManager.removeChannel(ctx.channel());
}
}
ctx.fireUserEventTriggered(evt);
}
/*
* 握手取消
* 客户端销毁的时候触发
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("握手取消");
Channel incoming = ctx.channel();
clients.remove(incoming);
}
/**
* 功能:服务端发生异常的操作
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
/*
* 功能:处理HTTP的代码
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws UnsupportedEncodingException {
// 如果HTTP解码失败,返回HHTP异常
if (req instanceof HttpRequest) {
HttpMethod method = req.method();
// 如果是websocket请求就握手升级
if ("/ws".equalsIgnoreCase(req.uri())) {
System.out.println(" req instanceof HttpRequest");
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"", null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
}
}
/*
* 处理Websocket的代码
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否是关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 文本消息,不支持二进制消息
if (frame instanceof TextWebSocketFrame) {
// 返回应答消息
String requestmsg = ((TextWebSocketFrame) frame).text();
System.out.println("收到信息"+requestmsg);
String[] array= requestmsg.split(",");
// 将通道加入通道管理器
UserInfoManager.addChannel(ctx.channel(),array[0]);
if (array.length== 3) {
// 将信息返回给h5
String sender=array[0];
String recevier=array[1];
String message=array[2];
UserInfoManager.broadcastMess(sender,recevier,message);
}
}
}
}
public class UserInfoManager {
private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
private static ConcurrentMap<Channel, UserInfo> userInfos = new ConcurrentHashMap<>();
/**
* 登录注册 channel
*/
public static void addChannel(Channel channel, String uid) {
String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(uid);
userInfo.setAddr(remoteAddr);
userInfo.setChannel(channel);
userInfos.put(channel, userInfo);
}
public static void removeChannel(Channel channel){
userInfos.remove(channel);
System.out.println("移除channel成功");
}
/**
* 普通消息
* @param message
*/
public static void broadcastMess(String sender,String recevier,String message) {
if (!BlankUtil.isBlank(message)) {
try {
rwLock.readLock().lock();
Set<Channel> keySet = userInfos.keySet();
for (Channel ch : keySet) {
UserInfo userInfo = userInfos.get(ch);
if (!userInfo.getUserId().equals(recevier)) {
continue;
}
String backmessage=sender+":"+message;
ch.writeAndFlush(new TextWebSocketFrame(backmessage));
System.out.println("客户端收到消息:"+backmessage);
/*responseToClient(ch,message);*/
}
} finally {
rwLock.readLock().unlock();
}
}
}
public static UserInfo getUserInfo(Channel channel) {
return userInfos.get(channel);
}
}
private String userId; // UID
private String addr; // 地址
private Channel channel;// 通道
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public class NettyUtil {
public static String parseChannelRemoteAddr(final Channel channel) {
if (null == channel) {
return "";
}
final SocketAddress remote = channel.remoteAddress();
final String addr = remote != null ? remote.toString() : "";
if (addr.length() > 0) {
int index = addr.lastIndexOf("/");
if (index >= 0) {
return addr.substring(index + 1);
}
return addr;
}
return "";
}
}
public class BlankUtil {
public static boolean isBlank(String ...strings){
for(String str:strings){
if (str == null || str.trim().length() == 0){
return true;
}
}
return false;
}
}
<template>
<div>
<div id="content" class="row-center">
<div
id="chat-box"
class="row-center"
style="height: 100px; width: 100px; border: 2px red ridge"
></div>
<div id="input-box">
<input class="chat-input" id="chat-input" placeholder="message" />
<input id="myid" placeholder="myid" />
<button id="login-button" @click="login()">登录</button>
<input id="friendid" placeholder="friendid" />
<button class="chat-button" id="send" @click="send()">发送</button>
</div>
</div>
</div>
</template>
<script>
import $ from "jquery";
var socket = null;
export default {
created() {
var ipAddress = "127.0.0.1";
//新建socket对象
socket = new WebSocket("ws://" + ipAddress + ":8086/ws");
},
methods: {
send() {
var data =
$("#myid").val() +
"," +
$("#friendid").val() +
"," +
$("#chat-input").val();
socket.send(data);
socket.onmessage = function (event) {
var datas = event.data.split(",");
console.log("服务器消息====" + datas);
$("#chat-box").text(datas);
};
},
login() {
var data = $("#myid").val();
// socket.onopen = function () {
socket.send(data);
// }
},
},
};
</script>
<style lang="less" scoped>
</style>
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_44932487/article/details/115601347
内容来源于网络,如有侵权,请联系作者删除!