gpt4 book ai didi

c++ - gRPC 和 etcd 客户端

转载 作者:行者123 更新时间:2023-11-28 04:36:06 28 4
gpt4 key购买 nike

这个问题涉及 etcd 特定的东西,但我认为这个问题通常与 gRPC 的工作更相关。我正在尝试为一些 key 创建 etcd Watch,因为文档很少我看了一下 Nokia implementation根据我的需要调整代码很容易,我想出了第一个运行良好的版本,创建 WatchCreateRequest,并在 key 更新时触发回调。到目前为止,一切都很好。然后我尝试添加多个键来观看。惨败! ClientAsyncReaderWriter 在这种情况下读取/写入失败。现在回答问题。

如果我类有以下成员

Watch::Stub watchStub;
CompletionQueue completionQueue;
ClientContext context;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
WatchResponse reply;

并且我想支持添加到我的类(class)的多个 Watches,我想我必须为每个 watch 保留多个变量,而不是作为类(class)成员。首先,我想,WatchResponse 回复 应该是每个 Watch 一个。我不太确定 stream,我应该为每个 Watch 保留一个吗?我几乎可以肯定 context 可以重用于所有 Watches 并且 100% 确定 stubcompletionQueue 可以可重复用于所有 Watches。所以问题是我的猜测对吗?什么是线程安全?没有找到任何文档来描述哪些对象可以安全地从多线程使用以及我必须在哪里同步访问。任何文档链接 (not this one) 将不胜感激!

在将成员拆分为单个 Watch 属性之前测试代码(没有正常关机,我知道)

using namespace grpc;
class Watcher
{
public:
using Callback = std::function<void(const std::string&, const std::string&)>;

Watcher(std::shared_ptr<Channel> channel) : watchStub(channel)
{
stream = watchStub.AsyncWatch(&context, &completionQueue, (void*) "create");
eventPoller = std::thread([this]() { WaitForEvent(); });
}

void AddWatch(const std::string& key, Callback callback)
{
AddWatch(key, callback, false);
}

void AddWatches(const std::string& key, Callback callback)
{
AddWatch(key, callback, true);
}

private:
void AddWatch(const std::string& key, Callback callback, bool isRecursive)
{
auto insertionResult = callbacks.emplace(key, callback);
if (!insertionResult.second) {
throw std::runtime_error("Event handle already exist.");
}
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
if (isRecursive) {
watch_create_req.set_range_end(key + "\xFF");
}

watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*) insertionResult.first->first.c_str());

stream->Read(&reply, (void*) insertionResult.first->first.c_str());
}

void WaitForEvent()
{
void* got_tag;
bool ok = false;

while (completionQueue.Next(&got_tag, &ok)) {
if (ok == false) {
break;
}
if (got_tag == (void*) "writes done") {
// Signal shutdown
}
else if (got_tag == (void*) "create") {
}
else if (got_tag == (void*) "write") {
}
else {

auto tag = std::string(reinterpret_cast<char*>(got_tag));
auto findIt = callbacks.find(tag);
if (findIt == callbacks.end()) {
throw std::runtime_error("Key \"" + tag + "\"not found");
}

if (reply.events_size()) {
ParseResponse(findIt->second);
}
stream->Read(&reply, got_tag);
}
}
}

void ParseResponse(Callback& callback)
{
for (int i = 0; i < reply.events_size(); ++i) {
auto event = reply.events(i);
auto key = event.kv().key();
callback(event.kv().key(), event.kv().value());
}
}

Watch::Stub watchStub;
CompletionQueue completionQueue;
ClientContext context;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
WatchResponse reply;
std::unordered_map<std::string, Callback> callbacks;
std::thread eventPoller;
};

最佳答案

很抱歉,我不太确定这里的 Watch 设计是否合适。我不太清楚是否要为每个 Watch 创建一个 gRPC 调用。

无论如何,每个 gRPC 调用都会有自己的 ClientContextClientAsyncReaderWriter。但是 stubCompletionQueue 不是每次调用的事情。

据我所知,没有找到线程安全类的中心位置。您可能需要阅读 API 文档以获得正确的预期。

当我写 async server load reporting service 的时候,我自己添加同步的唯一地方是 CompletionQueue,这样我就不会在 cq 关闭时将新标签加入队列。

关于c++ - gRPC 和 etcd 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51394558/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com