Grpc客户端异步(c++)

nzkunb0c  于 2023-06-07  发布在  其他
关注(0)|答案(1)|浏览(441)

我正在尝试使用Grpc(在cpp中)创建一个异步客户端,我希望我的客户端每秒打印一个点:当他从服务器得到一个符号时,我希望他打印出他从服务器得到一个符号,在我的例子中,它是enter:\n。只要它没有得到那个符号,它就会继续循环并打印点'.'我下面的代码不能正常工作,因为它似乎打印了点,但没有识别来自服务器的符号:
客户:

#include <stdlib.h>
#include <grpcpp/grpcpp.h>
#include <string>
#include <unistd.h>
#include <thread>
#include "dts.grpc.pb.h"

using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientAsyncResponseReader;
using grpc::Status;
using grpc::CompletionQueue;

class DTSClient{
    public:
        explicit DTSClient(std::string socket);
        ~DTSClient() {}

        void run() {
            while(true){
                EntryCreateRequest request;
                ClientContext context;
                EntryResponse response;
                CompletionQueue cq;
                Status status;
            
                unsigned int microsecond = 1000000;
                usleep(1 * microsecond); //sleeps for 1 second
                std::cout << "." << std::endl;
                
                auto reader = stub_->PrepareAsyncrun_stream(&context, request, &cq);
                reader->StartCall();

                std::cout << *(response.title().c_str()) <<std::endl;
                if (*(response.title().c_str()) == '\n') {
                    std::cout << "I got enter" << std::endl;
                    std::cout << "\n" << std::endl;
                }

                reader->Finish(&response, &status, (void*)1);

                
            
                
                }
                
        }

    
  void AsyncCompleteRpc() {
    void* got_tag;
    bool ok = false;
    CompletionQueue cq;
    EntryResponse response;

    while (cq.Next(&got_tag, &ok)) {
      if (ok) {
        GPR_ASSERT(got_tag == (void*)1);

        std::cout << "Greeter received: " << response.title() << std::endl;
      } else {
        std::cout << "RPC failed" << std::endl;
      }
    }
  }
    
    private:
        std::shared_ptr<grpc::Channel> channel_;
        std::unique_ptr<DTSService::Stub> stub_;

};

DTSClient::DTSClient(std::string socket_file) {
    std::string full_socket("unix://");
    full_socket += socket_file;
    channel_ = grpc::CreateChannel(full_socket, grpc::InsecureChannelCredentials());
    stub_ = DTSService::NewStub(channel_);
}

int main(int argc, char** argv) {
    DTSClient dts_client("/tmp/grpc.sock");
    std::cout << "Client start" << std::endl;

    std::thread thread_ = std::thread(&DTSClient::AsyncCompleteRpc, &dts_client);

    dts_client.run();

    thread_.join();

    return 0;
}

服务器:

# import required libraries & proto defn.
import grpc
from concurrent import futures
import sys
import os
import logging
import time
import subprocess
import asyncio

sys.path.append("/labhome/rdomnitz/workspace/collectx/Collectx/services/dts/proto")
import dts_pb2_grpc, dts_pb2

# Host where the server starts 
SERVER_ADDRESS = "/tmp/grpc.sock"

class Listener(dts_pb2_grpc.DTSServiceServicer):
    """
    class to run a DPEService server (dpe.proto)
    """

    def run_command(self, request, context):
        cmd = request.command
    
        timeout = request.timeout
     
        logging.debug("run_command: %s", cmd)
        proc = subprocess.Popen(cmd, shell=True,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            universal_newlines=True,
                            stdin=subprocess.PIPE)
        try:
            std_out, std_err = proc.communicate(timeout=timeout)
            rcode = proc.returncode
            timeout_expired = False
        except Exception as exp:
            proc.kill()
            logging.error(f"run_command: {cmd} failed with error: {exp}")
            std_out, std_err, rcode, timeout_expired = self._TIMEOUT_COMMAND_RESPONSE_TUPLE
        return dts_pb2.RunCommandResponse(std_out=std_out, std_err=std_err,
                                rc=rcode, timeout_expired=timeout_expired)

    def run_stream(self, request, context):
        print("Wait for input")
        input = sys.stdin.read(1)
        yield dts_pb2.EntryResponse(title=input)


def main():
    try:
        server = grpc.server(futures.ThreadPoolExecutor())

        dts_pb2_grpc.add_DTSServiceServicer_to_server(Listener(), server)     
        
        server.add_insecure_port(f"unix://{SERVER_ADDRESS}") 
        
        logging.debug("start server")
        server.start()
        server.wait_for_termination()
    
    except KeyboardInterrupt:
        print("KeyboardInterrupt")
        server.stop(0)

# invoke the server method
if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    main()

谢谢,罗恩

zhte4eai

zhte4eai1#

您在客户端代码中使用的方法的主要问题是,用于启动RPC的CompletionQueue示例与您在AsyncCompleteRpc()中驱动的示例不同。(我还没有验证代码的其余部分,但这是我发现的主要问题。
我建议使用C++回调API而不是基于CQ的API。Callback API使用起来更加直观和直接。你可以在这里找到一些例子-https://github.com/grpc/grpc/tree/master/examples

相关问题