java grpc客户端抛出不可用:注入istio sidecar时出现io异常

lrl1mhuk  于 2021-07-06  发布在  Java
关注(0)|答案(0)|浏览(202)

我使用客户端流模式。客户端可以在没有侧车的情况下连接服务器,但在侧车注入时失败。我不明白为什么会发生这个问题。
................................. ................................. ................................. ................................. ................................. .................................
这是我的密码:
流媒体.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.streaming";
option java_outer_classname = "StreamingProto";
option objc_class_prefix = "HLW";

package Streaming;
import public "google/protobuf/empty.proto";

service StreamingServer {
  rpc GoHello (stream HelloRequest) returns (google.protobuf.Empty) {}
}

message HelloRequest {
  string name  = 1;
}

message HelloReplay {
  string message = 1;
}

streamingclient.java文件

import com.google.protobuf.Empty;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.examples.streaming.HelloRequest;
import io.grpc.examples.streaming.StreamingServerGrpc;
import io.grpc.netty.shaded.io.netty.util.internal.StringUtil;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class StreamingClient {

  private AtomicInteger counter = new AtomicInteger(0);
  private final StreamingServerGrpc.StreamingServerStub stub;

  public StreamingClient(Channel channel) {
    stub = StreamingServerGrpc.newStub(channel);
  }

  public static void main(String[] args) throws InterruptedException {
    String port = System.getenv("GLAHA_GRPC_PORT");
    String serviceName = System.getenv("GLAHA_GRPC_SERVICE");
    serviceName = StringUtil.isNullOrEmpty(serviceName) ? "localhost" : serviceName;
    port = StringUtil.isNullOrEmpty(port) ? "50051" : port;
    // String target = "localhost:50051";
    System.out.println(serviceName + port);
    ManagedChannel channel =
        ManagedChannelBuilder.forAddress(serviceName , Integer.parseInt(port)).usePlaintext().keepAliveWithoutCalls(true).build();

    try {
      StreamingClient helloWorldClient = new StreamingClient(channel);
      helloWorldClient.go();
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

  private void go() throws InterruptedException {
    HelloRequest helloRequest = HelloRequest.newBuilder().setName("goHello").build();

    StreamObserver<Empty> responseObserver =
        new StreamObserver<Empty>() {
          @Override
          public void onNext(Empty value) {
            System.out.println("value.getMessage()");
          }

          @Override
          public void onError(Throwable t) {
              System.out.println("error gohello: " + t.getMessage());
          }

          @Override
          public void onCompleted() {
              System.out.println("stream completed");
          }
        };

    StreamObserver<HelloRequest> observer = stub.goHello(responseObserver);
      TimeUnit.MILLISECONDS.sleep(5000);
    while (true) {
      System.out.println("client sent counter:" + counter.incrementAndGet());
      observer.onNext(helloRequest);
      TimeUnit.MILLISECONDS.sleep(200);
    }
    //observer.onCompleted();
  }
}

streamserver.java文件

import com.google.protobuf.Empty;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.examples.streaming.HelloRequest;
import io.grpc.examples.streaming.StreamingServerGrpc;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class StreamingServer {

  private Server server;

  private void start() throws IOException {
    int port = 50051;

    server = ServerBuilder.forPort(port).addService(new StreamingServerImpl()).build().start();

    System.out.println("server up....");
    Runtime.getRuntime()
        .addShutdownHook(
            new Thread() {
              @Override
              public void run() {
                // Use stderr here since the logger may have been reset by its JVM shutdown hook.
                System.err.println("***shutting down gRPC server since JVM is shutting down");
                try {
                  StreamingServer.this.stop();
                } catch (InterruptedException e) {
                  e.printStackTrace(System.err);
                }
                System.err.println("***server shut down");
              }
            });
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    StreamingServer server = new StreamingServer();
    server.start();
    server.blockUntilShutdown();
  }

  private void stop() throws InterruptedException {
    if (server != null) {
      server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
  }

  private void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
      server.awaitTermination();
    }
  }

  static class StreamingServerImpl extends StreamingServerGrpc.StreamingServerImplBase {

    private AtomicInteger counter = new AtomicInteger(0);

    @Override
    public StreamObserver<HelloRequest> goHello(StreamObserver<Empty> responseObserver) {
      return new StreamObserver<HelloRequest>() {
        @Override
        public void onNext(HelloRequest value) {
          System.out.println("streaming receive counter:" + counter.incrementAndGet());
        }

        @Override
        public void onError(Throwable t) {
          System.out.println("responseObserver error:" + t.getMessage());
        }

        @Override
        public void onCompleted() {
          responseObserver.onCompleted();
        }
      };
    }
  }
}

k8s和istio yamls

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: stream-server-deployment
  labels:
    app: stream-server
spec:
  replicas: 2
  selector:
    matchLabels:
      app: stream-server
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "true"
      labels:
        app: stream-server
    spec:
      containers:
      - name: stream-server
        image: stream-server-local:latest
        imagePullPolicy: Never
        ports:
        - containerPort: 50051
---
apiVersion: v1
kind: Service
metadata:
  name: stream-server-service
spec:
  selector:
    app: stream-server
  ports:
    - protocol: TCP
      port: 50051
      targetPort: 50051
      name: grpc
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: stream-client-env-configmap
data:
  GLAHA_GRPC_SERVICE: "stream-server-service"
  GLAHA_GRPC_PORT: "50051"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: stream-client-deployment
  labels:
    app: stream-client
spec:
  replicas: 1
  selector:
    matchLabels:
      app: stream-client
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "true"
      labels:
        app: stream-client
    spec:
      containers:
      - name: stream-client
        image: stream-client-local:latest
        imagePullPolicy: Never
        envFrom:
          - configMapRef:
              name: stream-client-env-configmap
---

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题