根据 id 查 name
syntax = "proto3";
package grpc.proto;
option java_package = "com.grpc.proto";
option java_outer_classname = "StudentData";
option java_multiple_files = true ;
// 定义接口
service StudentService {
// 请求一个 Requset 对象,响应一个 Response 对象
rpc queryStudentNameById(MyRequestId) returns(MyResponseName) {}
// 请求一个 Requset 对象,响应一个 Stream 对象
rpc queryStudentsByCourseName(MyRequestCourseName) returns(stream MyResponseStudentsStream) {}
// 请求一个 Stream 对象,响应一个 Response 对象
rpc queryStudentsByCourseName2(stream MyRequestCourseName) returns(MyResponseStudents) {}
// 请求一个 Stream,响应一个 Stream 对象,本例测试这个接口
rpc queryStudentNameById2(stream MyRequestId) returns(stream MyResponseName) {}
}
message MyRequestId
{
int32 id = 1 ;
}
message MyResponseName
{
string name = 1 ;
}
message MyStudent
{
int32 id = 1 ;
string name = 2;
string courseName = 3 ;
}
message MyResponseStudents
{
// 服务端的响应结果是集合类型,因此需要加上 repeated
repeated MyStudent students = 1 ;
}
// 数据结构,定义请求的 Request 对象
message MyRequestCourseName
{
string courseName = 1 ;
}
// 数据结构,定义响应的 Stream
message MyResponseStudentsStream
{
int32 id = 1 ;
string name = 2;
string courseName = 3 ;
}
package grpc;
import grpc.proto.*;
import io.grpc.stub.StreamObserver;
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
@Override
public StreamObserver<MyRequestId> queryStudentNameById2(StreamObserver<MyResponseName> responseObserver) {
MyStreamObserver2 observer = new MyStreamObserver2();
observer.setResponseObserver(responseObserver);
return observer;
}
class MyStreamObserver2 implements StreamObserver<MyRequestId> {
private StreamObserver<MyResponseName> responseObserver;
private MyResponseName responseStudentName;
public void setResponseObserver(StreamObserver<MyResponseName> responseObserver) {
this.responseObserver = responseObserver;
}
@Override
public void onNext(MyRequestId value) {
System.out.println("接收到的请求参数是:" + value.getId());
// 假设查到的结果是“zs”
this.responseStudentName = MyResponseName.newBuilder().setName("zs").build();
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onNext(responseStudentName);
responseObserver.onCompleted();
}
}
}
package grpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class MyGRPCServer {
private Server server;
// 启动服务
private void start() throws IOException {
int port = 8888;
server = ServerBuilder.forPort(port)
.addService(new StudentServiceImpl())
.build()
.start();
Runtime.getRuntime().addShutdownHook(new Thread(() ->{
System.err.println(Thread.currentThread().getName() + ",关闭JVM");
// 当 JVM 关闭时,也同时关闭 MyGRPCServer服 务
MyGRPCServer.this.stop();
}
));
}
// 关闭服务
private void stop() {
if (server != null) {
server.shutdown();
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
// 等待服务结束
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final MyGRPCServer server = new MyGRPCServer();
server.start();
server.blockUntilShutdown();
}
}
package grpc;
import grpc.proto.MyRequestId;
import grpc.proto.MyResponseName;
import grpc.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
public class MyGRPCClient {
public static void main(String[] args) throws Exception {
// 创建一个客户端
ManagedChannel client = ManagedChannelBuilder.forAddress("127.0.0.1", 8888)
.usePlaintext().build();
// 在 grpc 中,如果是以 Stream 方式发出请求,则此请求是异步的。因此,不能再使用阻塞式 stub 对象。
StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc
.newStub(client);
// 请求一个 Stream,响应一个 Stream
StreamObserver<MyRequestId> requestIdObserver = stub.queryStudentNameById2(new StreamObserver<MyResponseName>() {
@Override
public void onNext(MyResponseName value) {
System.out.println("接收到的响应:" + value.getName());
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("查询结束");
}
});
requestIdObserver.onNext(MyRequestId.newBuilder().setId(1).build());
requestIdObserver.onCompleted();
Thread.sleep(3000);
client.shutdown();
}
}
接收到的请求参数是:1
接收到的响应:zs
查询结束
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/chengqiuming/article/details/125134076
内容来源于网络,如有侵权,请联系作者删除!