我正在尝试使用Grpc(在cpp中)创建一个异步客户端,我希望我的客户端每秒打印一个点:当他从服务器得到一个符号时,我希望他打印出他从服务器得到一个符号,在我的例子中,它是enter:\n
。只要它没有得到那个符号,它就会继续循环并打印点'.'我下面的代码不能正常工作,因为它似乎打印了点,但没有识别来自服务器的符号:
客户:
#include <stdlib.h>
#include <grpcpp/grpcpp.h>
#include <string>
#include <unistd.h>
#include <thread>
#include "dts.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientAsyncResponseReader;
using grpc::Status;
using grpc::CompletionQueue;
class DTSClient{
public:
explicit DTSClient(std::string socket);
~DTSClient() {}
void run() {
while(true){
EntryCreateRequest request;
ClientContext context;
EntryResponse response;
CompletionQueue cq;
Status status;
unsigned int microsecond = 1000000;
usleep(1 * microsecond); //sleeps for 1 second
std::cout << "." << std::endl;
auto reader = stub_->PrepareAsyncrun_stream(&context, request, &cq);
reader->StartCall();
std::cout << *(response.title().c_str()) <<std::endl;
if (*(response.title().c_str()) == '\n') {
std::cout << "I got enter" << std::endl;
std::cout << "\n" << std::endl;
}
reader->Finish(&response, &status, (void*)1);
}
}
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
CompletionQueue cq;
EntryResponse response;
while (cq.Next(&got_tag, &ok)) {
if (ok) {
GPR_ASSERT(got_tag == (void*)1);
std::cout << "Greeter received: " << response.title() << std::endl;
} else {
std::cout << "RPC failed" << std::endl;
}
}
}
private:
std::shared_ptr<grpc::Channel> channel_;
std::unique_ptr<DTSService::Stub> stub_;
};
DTSClient::DTSClient(std::string socket_file) {
std::string full_socket("unix://");
full_socket += socket_file;
channel_ = grpc::CreateChannel(full_socket, grpc::InsecureChannelCredentials());
stub_ = DTSService::NewStub(channel_);
}
int main(int argc, char** argv) {
DTSClient dts_client("/tmp/grpc.sock");
std::cout << "Client start" << std::endl;
std::thread thread_ = std::thread(&DTSClient::AsyncCompleteRpc, &dts_client);
dts_client.run();
thread_.join();
return 0;
}
服务器:
# import required libraries & proto defn.
import grpc
from concurrent import futures
import sys
import os
import logging
import time
import subprocess
import asyncio
sys.path.append("/labhome/rdomnitz/workspace/collectx/Collectx/services/dts/proto")
import dts_pb2_grpc, dts_pb2
# Host where the server starts
SERVER_ADDRESS = "/tmp/grpc.sock"
class Listener(dts_pb2_grpc.DTSServiceServicer):
"""
class to run a DPEService server (dpe.proto)
"""
def run_command(self, request, context):
cmd = request.command
timeout = request.timeout
logging.debug("run_command: %s", cmd)
proc = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
stdin=subprocess.PIPE)
try:
std_out, std_err = proc.communicate(timeout=timeout)
rcode = proc.returncode
timeout_expired = False
except Exception as exp:
proc.kill()
logging.error(f"run_command: {cmd} failed with error: {exp}")
std_out, std_err, rcode, timeout_expired = self._TIMEOUT_COMMAND_RESPONSE_TUPLE
return dts_pb2.RunCommandResponse(std_out=std_out, std_err=std_err,
rc=rcode, timeout_expired=timeout_expired)
def run_stream(self, request, context):
print("Wait for input")
input = sys.stdin.read(1)
yield dts_pb2.EntryResponse(title=input)
def main():
try:
server = grpc.server(futures.ThreadPoolExecutor())
dts_pb2_grpc.add_DTSServiceServicer_to_server(Listener(), server)
server.add_insecure_port(f"unix://{SERVER_ADDRESS}")
logging.debug("start server")
server.start()
server.wait_for_termination()
except KeyboardInterrupt:
print("KeyboardInterrupt")
server.stop(0)
# invoke the server method
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()
谢谢,罗恩
1条答案
按热度按时间zhte4eai1#
您在客户端代码中使用的方法的主要问题是,用于启动RPC的
CompletionQueue
示例与您在AsyncCompleteRpc()
中驱动的示例不同。(我还没有验证代码的其余部分,但这是我发现的主要问题。我建议使用C++回调API而不是基于CQ的API。Callback API使用起来更加直观和直接。你可以在这里找到一些例子-https://github.com/grpc/grpc/tree/master/examples