gRPC 实战之——客户端为 Stream,服务端为 Response

x33g5p2x  于2022-06-06 转载在 其他  
字(4.9k)|赞(0)|评价(0)|浏览(450)

一 需求

根据课程名查询学生。

二 编写 proto 文件

syntax = "proto3";
package grpc.proto;
option java_package = "com.grpc.proto";
option java_outer_classname = "StudentData";
option java_multiple_files = true ;
// 定义接口
service StudentService {
    // 请求一个 Requset 对象,响应一个 Response 对象
    rpc queryStudentNameById(MyRequestId) returns(MyResponseName) {}
    // 请求一个 Requset 对象,响应一个 Stream 对象
    rpc queryStudentsByCourseName(MyRequestCourseName) returns(stream MyResponseStudentsStream) {}
    // 请求一个 Stream 对象,响应一个 Response 对象,本例用到的是这个接口
    rpc queryStudentsByCourseName2(stream MyRequestCourseName) returns(MyResponseStudents) {}
    // 请求一个 Stream,响应一个 Stream 对象
    rpc queryStudentNameById2(stream MyRequestId) returns(stream MyResponseName) {}
}

message MyRequestId
{
    int32 id = 1 ;
}

message MyResponseName
{
    string name = 1 ;
}

message MyStudent
{
    int32 id = 1 ;
    string name = 2;
    string courseName = 3 ;
}

message MyResponseStudents
{
    // 服务端的响应结果是集合类型,因此需要加上 repeated
    repeated MyStudent students = 1 ;
}

// 数据结构,定义请求的 Request 对象
message MyRequestCourseName
{
    string courseName = 1 ;
}
// 数据结构,定义响应的 Stream
message MyResponseStudentsStream
{
    int32 id = 1 ;
    string name = 2;
    string courseName = 3 ;
}

三 编写接口实现类

package grpc;

import grpc.proto.*;
import io.grpc.stub.StreamObserver;

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
    // 向客户端返回一个 StreamObserver 对象,服务端给客户端返回了一个 StreamObserver 对象,即响应了一个 StreamObserver 类型的 Response 对象。
    @Override
    public StreamObserver<MyRequestCourseName> queryStudentsByCourseName2(StreamObserver<MyResponseStudents> responseObserver) {
        MyStreamObserver observer = new MyStreamObserver();
        observer.setResponseObserver(responseObserver);
        return observer;
    }

    class MyStreamObserver implements StreamObserver<MyRequestCourseName> {
        private StreamObserver<MyResponseStudents> responseObserver;
        private MyResponseStudents responseStudents;

        public void setResponseObserver(StreamObserver<MyResponseStudents> responseObserver) {
            this.responseObserver = responseObserver;
        }

        @Override
        public void onNext(MyRequestCourseName value) {
            System.out.println("接收到的请求参数是:" + value.getCourseName());
            // 根据 value.getCourseName() 模拟查询操作...
            MyStudent student1 = MyStudent.newBuilder().setId(1).setName("zs").setCourseName("java").build();
            MyStudent student2 = MyStudent.newBuilder().setId(2).setName("ls").setCourseName("java").build();
            // 将查询结果放入 responseStudents 中
            this.responseStudents = MyResponseStudents.newBuilder().addStudents(student1).addStudents(student2).build();
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }

        @Override
        public void onCompleted() {
            // 将查询结果放入 responseStudents 中
            responseObserver.onNext(responseStudents);
            responseObserver.onCompleted();
        }
    }
}

四 编写服务端代码

package grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class MyGRPCServer {
    private Server server;

    // 启动服务
    private void start() throws IOException {
        int port = 8888;
        server = ServerBuilder.forPort(port)
                .addService(new StudentServiceImpl())
                .build()
                .start();
        Runtime.getRuntime().addShutdownHook(new Thread(() ->{
                System.err.println(Thread.currentThread().getName() + ",关闭JVM");
                // 当 JVM 关闭时,也同时关闭 MyGRPCServer服 务
                MyGRPCServer.this.stop();
            }
        ));
    }

    // 关闭服务
    private void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            // 等待服务结束
            server.awaitTermination();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        final MyGRPCServer server = new MyGRPCServer();
        server.start();
        server.blockUntilShutdown();
    }
}

五 编写客户端代码

package grpc;

import grpc.proto.MyRequestCourseName;
import grpc.proto.MyResponseStudents;
import grpc.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

public class MyGRPCClient {
    public static void main(String[] args) throws Exception {
        // 创建一个客户端
        ManagedChannel client = ManagedChannelBuilder.forAddress("127.0.0.1", 8888)
                .usePlaintext().build();

        // 在 grpc 中,如果是以 Stream 方式发出请求,则此请求是异步的。因此,不能再使用阻塞式 stub 对象。
        StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc
                .newStub(client);

        // 请求 stream,响应 response 对象
        // 接收服务端返回的 StreamObserver 类型的响应结果
        StreamObserver<MyResponseStudents> students = new StreamObserver<MyResponseStudents>() {
            @Override
            public void onNext(MyResponseStudents value) {
                value.getStudentsList().forEach((student) -> {
                    System.out.println(student.getId() + "\t" + student.getName() + "\t" + student.getCourseName());
                });
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("查询结束");
            }
        };

        // 准备一个 StreamObserver 流,用于向服务端发送请求
        StreamObserver<MyRequestCourseName> myRequestObserver = stub.queryStudentsByCourseName2(students);
        myRequestObserver.onNext(MyRequestCourseName.newBuilder().setCourseName("java").build());
        // 如果是向服务端发出多个 Stream 请求,则可以写多个 onNext(),如下
        // myRequestObserver.onNext( MyRequestCourseName.newBuilder().setCourseName("python").build());
        myRequestObserver.onCompleted();
        // 因为请求是异步的,所以客户端在发出请求后不会立刻得到响应结果。本程序通过休眠来模拟等待服务端的执行过程。
        Thread.sleep(3000);
        client.shutdown();
    }
}

六 测试

1 启动服务端

2 启动客户端

3 服务端打印

接收到的请求参数是:java

4 客户端打印

1    zs    java

2    ls    java

查询结束

相关文章