gRPC 和 etcd 客户端

gRPC and etcd client

这个问题涉及 etcd 特定的东西,但我认为这个问题与一般的 gRPC 更相关。 我正在尝试为某些键创建 etcd Watch,因为文档很少我看了一下诺基亚 implementation 根据我的需要调整代码很容易,我想出了第一个运行良好的版本,创建 WatchCreateRequest,并在密钥更新时触发回调。到目前为止,一切都很好。然后我尝试添加多个键来观看。惨败!在这种情况下,ClientAsyncReaderWriter 无法 Read/Write。现在回答问题。

如果我的 class

中有以下成员
Watch::Stub watchStub;
CompletionQueue completionQueue;
ClientContext context;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
WatchResponse reply;

并且我想支持添加到我的 class 的多个 Watches,我想我必须为每个手表保留多个变量,而不是作为 class 成员。 首先,我想,WatchResponse reply 应该是每个 Watch 一个。我不太确定 stream,我应该每个 Watch 持有一个吗?我几乎可以肯定 context 可以对所有 Watches 重复使用,并且 100% 确定 stubcompletionQueue 可以对所有 Watches 重复使用。 所以问题是我的猜测对吗?什么是线程安全?没有找到任何文档来描述哪些对象可以安全地从多线程使用以及我必须在哪里同步访问。 任何 link 文档 (not this one) 将不胜感激!

在我将成员拆分为单个成员之前测试代码Watch 属性 (没有正常关机,我知道)

using namespace grpc;
class Watcher
{
public:
    using Callback = std::function<void(const std::string&, const std::string&)>;

    Watcher(std::shared_ptr<Channel> channel) : watchStub(channel)
    {
        stream = watchStub.AsyncWatch(&context, &completionQueue, (void*) "create");
        eventPoller = std::thread([this]() { WaitForEvent(); });
    }

    void AddWatch(const std::string& key, Callback callback)
    {
        AddWatch(key, callback, false);
    }

    void AddWatches(const std::string& key, Callback callback)
    {
        AddWatch(key, callback, true);
    }

private:
    void AddWatch(const std::string& key, Callback callback, bool isRecursive)
    {
        auto insertionResult = callbacks.emplace(key, callback);
        if (!insertionResult.second) {
            throw std::runtime_error("Event handle already exist.");
        }
        WatchRequest watch_req;
        WatchCreateRequest watch_create_req;
        watch_create_req.set_key(key);
        if (isRecursive) {
            watch_create_req.set_range_end(key + "\xFF");
        }

        watch_req.mutable_create_request()->CopyFrom(watch_create_req);
        stream->Write(watch_req, (void*) insertionResult.first->first.c_str());

        stream->Read(&reply, (void*) insertionResult.first->first.c_str());
    }

    void WaitForEvent()
    {
        void* got_tag;
        bool ok = false;

        while (completionQueue.Next(&got_tag, &ok)) {
            if (ok == false) {
                break;
            }
            if (got_tag == (void*) "writes done") {
                // Signal shutdown
            }
            else if (got_tag == (void*) "create") {
            }
            else if (got_tag == (void*) "write") {
            }
            else {

                auto tag = std::string(reinterpret_cast<char*>(got_tag));
                auto findIt = callbacks.find(tag);
                if (findIt == callbacks.end()) {
                    throw std::runtime_error("Key \"" + tag + "\"not found");
                }

                if (reply.events_size()) {
                    ParseResponse(findIt->second);
                }
                stream->Read(&reply, got_tag);
            }
        }
    }

    void ParseResponse(Callback& callback)
    {
        for (int i = 0; i < reply.events_size(); ++i) {
            auto event = reply.events(i);
            auto key = event.kv().key();
            callback(event.kv().key(), event.kv().value());
        }
    }

    Watch::Stub watchStub;
    CompletionQueue completionQueue;
    ClientContext context;
    std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
    WatchResponse reply;
    std::unordered_map<std::string, Callback> callbacks;
    std::thread eventPoller;
};

很抱歉,我不太确定这里 Watch 的设计是否合适。我不太清楚是否要为每个 Watch.

创建一个 gRPC 调用

无论如何,每个 gRPC 调用都会有自己的 ClientContextClientAsyncReaderWriter。但是 stubCompletionQueue 不是每次调用的事情。

据我所知,没有找到线程安全的中心位置类。您可能需要阅读 API 文档以获得正确的期望。

当我写 async server load reporting service 时,我自己添加同步的唯一地方是 CompletionQueue,这样我就不会在 cq 关闭时将新标签加入队列。