如何在C++中用etcd实现grpc解析器?

eivgtgni  于 2023-05-30  发布在  Etcd
关注(0)|答案(1)|浏览(372)

2个问题:
1.我已经建立和安装grpc与指南https://github.com/grpc/grpc/blob/master/BUILDING.md,grpc标签是v1.41.0.但是我在/usr/local/include/grpc++/中找不到resolver.h和balancer.h,但是在grpc源代码中,它在grpc/src/core/lib/resolver/中。
1.如何将端点字符串转换为某种grpc结果结构,并将结果设置为grpc?找不到例子
下面是我的代码:

#pragma once
#include <grpc/grpc.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/support/channel_arguments.h>

#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>

#include "etcd-cpp-apiv3/Client.hpp"
#include "etcd-cpp-apiv3/Response.hpp"
#include "etcd-cpp-apiv3/Watcher.hpp"

class EtcdResolver : public grpc_core::Resolver {
public:
    explicit EtcdResolver(const std::string& etcd_address, const std::string& service_name, 
                            const std::string& runtime, const std::string& ca, const std::string& cert, const std::string& prikey)
    {
        etcd::Client* etcd = etcd::Client::WithSSL(etcd_address, ca, cert, prikey);
        if (etcd == nullptr)
        {
            printf("create xetcd client fail\n");
            exit(1);
        }
        std::shared_ptr<etcd::Client> shared_client(etcd);
        client_ = shared_client;
        runtime_ = runtime;
        etcd_address_ = etcd_address;
        service_name_ = service_name;
        ca_ = ca;
        cert_ = cert;
        prikey_ = prikey;
    };

    ~EtcdResolver(){};

    void StartLocked()
    {
        std::vector<std::string> endpoints = FirstGetEndpointsFromEtcd();
        grpc::ChannelArguments args;
        args.SetServiceConfigJSON("{\"loadBalancingConfig\":[{\"round_robin\":{}}]}");
    
        for (const auto& endpoint : endpoints) {
            // how to convert string to grpc_core::Resolver::Result ?
        }
        UpdateStateLocked(result_);

        etcd::Watcher watcher(*client_.get(), service_key_, std::bind(&EtcdResolver::OnWatch, this, std::placeholders::_1), true);
        std::shared_ptr<etcd::Watcher> shared_watcher(&watcher);
        watcher_ = shared_watcher;
    };

    void RequestReresolution(){};

    void ResetBackoff(){};

    void UpdateStateLocked(grpc_core::Resolver::Result result)
    {
        // how to set the result into grpc?
    };

private:
    std::vector<std::string> FirstGetEndpointsFromEtcd()
    {
        std::vector<std::string> endpoints;
        service_key_ = std::string("services_prefix/") + runtime_ + "/" + service_name_ + "/";
        auto resp = client_->ls(service_key_).get();
        if (resp.error_code() != 0) {
            throw std::runtime_error(resp.error_message());
        }
        for (const auto& val : resp.values()) {
            Json::Reader reader;
            Json::Value value;
            if (reader.parse(val.as_string(), value)){
                endpoints.emplace_back(value["addr"].asString());
            }
        }
        return endpoints;
    };

    void OnWatch(etcd::Response const & resp)
    {
        if (!resp.is_ok()) return;
        std::cout << "OnWatch triggered" << std::endl;
        auto resp2 = client_->ls(service_key_).get();
        if (resp2.error_code() != 0) {
            throw std::runtime_error(resp2.error_message());
        }
        std::unique_lock<std::mutex> lock(mutex_);
        std::vector<std::string> endpoints;
        for (const auto& val : resp2.values()) {
            Json::Reader reader;
            Json::Value value;
            if (reader.parse(val.as_string(), value)){
                endpoints.emplace_back(value["addr"].asString());
            }
        }
        for (const auto& endpoint : endpoints) {
            // how to convert string to grpc_core::Resolver::Result ?
        }
        std::unique_lock<std::mutex> unlock(mutex_);
        UpdateStateLocked(result_);
    };

private:
    std::string runtime_;
    std::string etcd_address_;
    std::string service_name_;
    std::shared_ptr<etcd::Client> client_;
    std::shared_ptr<etcd::Watcher> watcher_;
    grpc_core::Resolver::Result result_;
    std::mutex mutex_;

    std::string ca_;
    std::string cert_;
    std::string prikey_;
    std::string service_key_;
};

我试着阅读grpc src代码,发现下面,不能确保它的权利?

// convert string to grpc_core::Resolver::Result
grpc_resolved_address resolved_address;
memset(&resolved_address, 0, sizeof(grpc_resolved_address));

absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(endpoint);
grpc_parse_uri(uri, &resolved_address);
result_.addresses.emplace_back(grpc_core::ServerAddress(resolved_address, nullptr));

仍然无法找到一种方法将Result设置为grpc内部

js81xvg6

js81xvg61#

不幸的是,客户端通道解析器不是公共的C或C++ API,应用程序无法提供自己的。此外,不建议进入gRPC代码库内部(src/下的所有内容),那里没有API稳定性保证,您的集成将定期中断。公共的、稳定的API都在include/文件夹下。

相关问题