c++ 同时提供异步和同步服务的Grpc服务器

u91tlkcl  于 2023-04-13  发布在  其他
关注(0)|答案(1)|浏览(255)

我需要能够为来自主线程的一些特定请求提供响应,而其余的可以从任何线程到达。考虑到这一点,我创建了一个GRPC服务器,它有2个服务,一个是作为AsyncService实现的,另一个是作为同步服务。
但添加完成队列后,同步服务不再响应请求。

builder.RegisterService(this); // this inherits from Service (sync)
    builder.RegisterService(&m_service); // m_services is an AsyncService 
    m_mainThreadQueue = builder.AddCompletionQueue();
    m_server = std::unique_ptr<Server>(builder.BuildAndStart());
    {
        (new GrabSnapshotCallData(this, &m_service, m_mainThreadQueue.get()))->Proceed();
    }
    m_server->Wait();

添加完成队列使得同步服务不再响应请求。我在任何地方都找不到关于这个特定主题的很多信息,所以也许grpc并不真正支持它。
那么,有没有一种方法可以在同一台服务器上同时拥有异步和同步服务呢?如果没有,我应该怎么做来模拟这种行为呢

jhdbpxl9

jhdbpxl91#

下面是一个简单的c++示例

#include <iostream>
#include <string>
#include <grpcpp/grpcpp.h>
#include "gRpcAutoGenerated/hellostreamingworld.grpc.pb.h"

class ServiceImpl_sync : public hellostreamingworld::MultiGreeter::Service
{
public:
  grpc::Status sayHello_Sync(grpc::ServerContext *context, const hellostreamingworld::HelloRequest *request, grpc::ServerWriter<hellostreamingworld::HelloReply> *responsWriter) override
  {
    std::cout << "sayHello_Sync" << std::endl;

    for (int i = 0; i < 10; i++)
    {
      hellostreamingworld::HelloReply newHelloReply;
      newHelloReply.set_message("Hello Sync " + request->name() + " " + std::to_string(i));
      responsWriter->Write(newHelloReply);
    }
    return grpc::Status::OK;
  };
};

class sayHello_aSync_handler final
{

public:
  sayHello_aSync_handler(hellostreamingworld::MultiGreeter::WithAsyncMethod_sayHello_aSync<ServiceImpl_sync> *service, grpc::ServerCompletionQueue *cq)
  {
    service_ = service;
    cq_ = cq;
    responsWriter_ = new grpc::ServerAsyncWriter<hellostreamingworld::HelloReply>(&context_);
    service_->RequestsayHello_aSync(&context_, &request_, responsWriter_, cq_, cq_, this);
    state_ = CallState::Status_Write;
    cycleCounter_ = 0;
  }

  void Procss()
  {
    std::cout << "sayHello_aSync_handler::Procss" << std::endl;
    if (state_ == CallState::Status_Write)
    {
      cycleCounter_++;
      hellostreamingworld::HelloReply newHelloReply;
      newHelloReply.set_message("Hello aSync " + request_.name() + " " + std::to_string(cycleCounter_));
      if (cycleCounter_ < 10)
      {
        responsWriter_->Write(newHelloReply, this);
      }
      else
      {
        state_ = CallState::Status_Finish;
        responsWriter_->Finish(grpc::Status::OK, this);
      }
    }
  }

  enum CallState
  {
    Status_Write,
    Status_Finish
  };

  hellostreamingworld::MultiGreeter::WithAsyncMethod_sayHello_aSync<ServiceImpl_sync> *service_;
  grpc::ServerCompletionQueue *cq_;
  grpc::ServerContext context_;
  hellostreamingworld::HelloRequest request_;
  grpc::ServerAsyncWriter<hellostreamingworld::HelloReply> *responsWriter_;
  int cycleCounter_;
  CallState state_;
};

int main(int argc, char **argv)
{
  std::string server_address("0.0.0.0:50051");
  grpc::ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  
  ServiceImpl_sync SyncService;
  hellostreamingworld::MultiGreeter::WithAsyncMethod_sayHello_aSync<ServiceImpl_sync> aSyncService;
  builder.RegisterService(&aSyncService);
  
  std::unique_ptr<grpc::ServerCompletionQueue> cq = builder.AddCompletionQueue();
  std::unique_ptr<grpc::Server> server = builder.BuildAndStart();

  std::cout << "Server listening on " << server_address << std::endl;

  new sayHello_aSync_handler(&aSyncService, cq.get());

  void *got_tag = nullptr;
  bool ok = false;
  while (true)
  {
    std::cerr << "Blocked by cq->Next" << std::endl;
    if (!cq->Next(&got_tag, &ok))
    {
      std::cerr << "Server stream closed. Quitting" << std::endl;
    }
    else if (ok)
    {
      static_cast<sayHello_aSync_handler *>(got_tag)->Procss();
    }
  }

  return 0;
}

hellostreamingworld.proto

syntax = "proto3";

option java_package = "ex.grpc";
option objc_class_prefix = "HSW";

package hellostreamingworld;

// The greeting service definition.
service MultiGreeter {
  // Sends multiple greetings
  rpc sayHello_Sync (HelloRequest) returns (stream HelloReply) {}
  rpc sayHello_aSync (HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name and how many greetings
// they want.
message HelloRequest {
  string name = 1;
  string num_greetings = 2;
}

// A response message containing a greeting
message HelloReply {
  string message = 1;
}

相关问题