gRPC 实战之——客户端为 Request,服务端为 Stream

x33g5p2x  于2022-06-06 转载在 其他  
字(4.1k)|赞(0)|评价(0)|浏览(305)

一 需求

根据课程名查询学生。

二 编写 proto 文件

syntax = "proto3";
package grpc.proto;
option java_package = "com.grpc.proto";
option java_outer_classname = "StudentData";
option java_multiple_files = true ;
// 定义接口
service StudentService {
    // 请求一个 Requset 对象,响应一个 Response 对象
    rpc queryStudentNameById(MyRequestId) returns(MyResponseName) {}
    // 请求一个 Requset 对象,响应一个 Stream 对象,本例用的是这个接口
    rpc queryStudentsByCourseName(MyRequestCourseName) returns(stream MyResponseStudentsStream) {}
    // 请求一个 Stream 对象,响应一个 Response 对象
    rpc queryStudentsByCourseName2(stream MyRequestCourseName) returns(MyResponseStudents) {}
    // 请求一个 Stream,响应一个 Stream 对象
    rpc queryStudentNameById2(stream MyRequestId) returns(stream MyResponseName) {}
}

// 定义请求的 Request 对象
message MyRequestId
{
    int32 id = 1 ;
}

// 定义响应的 Stream 对象
message MyResponseName
{
    string name = 1 ;
}

message MyStudent
{
    int32 id = 1 ;
    string name = 2;
    string courseName = 3 ;
}

message MyResponseStudents
{
    // 服务端的响应结果是集合类型,因此需要加上 repeated
    repeated MyStudent students = 1 ;
}



// 数据结构,定义请求的 Request 对象
message MyRequestCourseName
{
    string courseName = 1 ;
}
// 数据结构,定义响应的 Stream
message MyResponseStudentsStream
{
    int32 id = 1 ;
    string name = 2;
    string courseName = 3 ;
}

三 编写接口实现类

package grpc;

import grpc.proto.*;
import io.grpc.stub.StreamObserver;

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
    @Override
    public void queryStudentNameById(MyRequestId request, StreamObserver<MyResponseName> responseObserver) {
        System.out.println("模拟查询此id的用户名:" + request.getId());
        // 假设此 id 的 name 是“zs”
        responseObserver.onNext(MyResponseName.newBuilder().setName("zs").build());
        responseObserver.onCompleted();
    }

    // 通过 Stream 的方式响应客户端
    @Override
    public void queryStudentsByCourseName(MyRequestCourseName request, StreamObserver<MyResponseStudentsStream> responseObserver) {
        // 接收到的 courseName 是"java"
        String courseName = request.getCourseName();
        // 假设有 3 个 Student 选修了"java"课程
        MyResponseStudentsStream student1 = MyResponseStudentsStream.newBuilder().setId(1).setName("zs").setCourseName("java").build();
        MyResponseStudentsStream student2 = MyResponseStudentsStream.newBuilder().setId(2).setName("ls").setCourseName("java").build();
        MyResponseStudentsStream student3 = MyResponseStudentsStream.newBuilder().setId(3).setName("ww").setCourseName("java").build();
        // 将查询到的 3 个 Student,放入 responseObserver中
        responseObserver.onNext(student1);
        responseObserver.onNext(student2);
        responseObserver.onNext(student3);
        responseObserver.onCompleted();
    }
}

四 编写服务端代码

package grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class MyGRPCServer {
    private Server server;

    // 启动服务
    private void start() throws IOException {
        int port = 8888;
        server = ServerBuilder.forPort(port)
                .addService(new StudentServiceImpl())
                .build()
                .start();
        Runtime.getRuntime().addShutdownHook(new Thread(() ->{
                System.err.println(Thread.currentThread().getName() + ",关闭JVM");
                // 当 JVM 关闭时,也同时关闭 MyGRPCServer服 务
                MyGRPCServer.this.stop();
            }
        ));
    }

    // 关闭服务
    private void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            // 等待服务结束
            server.awaitTermination();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        final MyGRPCServer server = new MyGRPCServer();
        server.start();
        server.blockUntilShutdown();
    }
}

五 编写客户端代码

package grpc;

import grpc.proto.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.util.Iterator;

public class MyGRPCClient {
    public static void main(String[] args) throws Exception {
        // 创建一个客户端
        ManagedChannel client = ManagedChannelBuilder.forAddress("127.0.0.1", 8888)
                .usePlaintext().build();
        try {
            // 创建客户端的代理对象,用于代表客户端去访问服务端提供的方法
            StudentServiceGrpc.StudentServiceBlockingStub stub = StudentServiceGrpc
                    .newBlockingStub(client);

            //  请求 request,响应 Stream
            Iterator<MyResponseStudentsStream> students = stub.queryStudentsByCourseName(MyRequestCourseName.newBuilder().setCourseName("java").build());
            while (students.hasNext()) {
                MyResponseStudentsStream student = students.next();
                System.out.println(student.getId() + "\t" + student.getName() + "\t" + student.getCourseName());
            }
        } finally {
            client.shutdown();
        }
    }
}

六 测试

1 启动服务端和客户端

2 客户端运行结果如下

1    zs    java

2    ls    java

3    ww    java

相关文章