使用 Netty + Protobuf 开发自定义的 RPC 功能

x33g5p2x  于2022-06-06 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(230)

一 点睛

RPC 是指一个节点通过网络请求另一个节点提供的服务,这里所说的“提供的服务”可以具体化为“提供的方法”或“提供的属性”。

本篇实现一个最简单的 RPC 功能:调用的双方都是采用 Java 语言编写,并且调用的服务是远程提供的“属性”。

二 实战

1 将 message 以规定的语法结构编写到 Student.proto 文件中。

netty\protobuf\Student.proto

syntax = "proto2" ;

package netty.protobuf ;

option optimize_for = SPEED ;

option java_package = "netty.protobuf" ;

option java_outer_classname = "StudentMessage" ;

message Student

{

required string name = 1 ;

optional int32 age = 2 ;

}

2 根据  Student.proto 生成 Java 存储数据的数据结构,即 Java 类

E:\JVMDemo\demo2022>protoc --java_out=src/main/java src/main/java/netty/protobuf/Student.proto

该命令会根据 Student.proto 定义好的规则,在 src/main/java 目录下生成包 "netty.protobuf",并在该包中生成 StudentMessage.java

3 客户端主程序类

package netty.protobuf;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyNettyClientTest {
    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyNettyClientInitializer());
            Channel channel = bootstrap.connect("127.0.0.1", 8888).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

4 客户端初始化器

package netty.protobuf;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class MyNettyClientInitializer  extends ChannelInitializer<SocketChannel> {
    // 连接被注册后,立刻执行此方法
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        /*思考:如何传递任何类型的数据呢?不要固定成PersonData.Person
            1.使用netty自定义协议:前几位编码,如果是a 则解码成....如果是b,则解码成...
            b.使用protobuf解决:
        */
//        pipeline.addLast("ProtobufDecoder",new ProtobufDecoder(PersonMessage.Person.getDefaultInstance()));//解码:字节->对象
        // PProtobufVarint32FrameDecoder 和 rotobufVarint32LengthFieldPrepender 用于解决半包和粘包问题,这里仅做了解
        pipeline.addLast("ProtobufVarint32FrameDecoder",new ProtobufVarint32FrameDecoder()) ;
        pipeline.addLast("ProtobufVarint32LengthFieldPrepender",new ProtobufVarint32LengthFieldPrepender());
        // 用于将 StudentMessage 类转为字节码
        pipeline.addLast("ProtobufEncoder",new ProtobufEncoder());
        // 构建 Student 对象,并发送给服务端
        pipeline.addLast("MyNettyClientHandler", new MyNettyClientHandler());
    }
}

5 客户端处理器

package netty.protobuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String receiveMsg) {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        StudentMessage.Student Student = StudentMessage.Student.newBuilder().setName("zs").setAge(23).build();
        // 发送给服务单
        ctx.channel().writeAndFlush(Student);
    }
}

6 服务端主程序类

package netty.protobuf;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyNettyServerTest {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap:服务端启动时的初始化操作
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 将 bossGroup 和 workerGroup 注册到服务端的 Channel 上,并注册一个服务端的初始化器 NettyServerInitializer(该初始化器中的 initChannel() 方法,会在连接被注册后立刻执行);最后将端口号绑定到 8888
            ChannelFuture channelFuture = serverBootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyNettyServerInitializer())
                    .bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

7 服务端初始化类

package netty.protobuf;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        pipeline.addLast("ProtobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder());
        // 用于将 byte[] 解码成 StudentMessage 对象
        pipeline.addLast("ProtobufDecoder", new ProtobufDecoder(StudentMessage.Student.getDefaultInstance()));
        pipeline.addLast("ProtobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
        // 打印 StudentMessage 对象
        pipeline.addLast("MyNettyServerHandler", new MyNettyServerHandler());
    }
}

8 服务端处理器

package netty.protobuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyNettyServerHandler extends SimpleChannelInboundHandler<StudentMessage.Student> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, StudentMessage.Student receiveMsg) throws Exception {
        System.out.println(receiveMsg.getName() + "--" + receiveMsg.getAge());
    }
}

三 测试

先启动服务端,再通过客户端向服务端发送数据,服务端运行结果如下
zs--23

相关文章