gpt4 book ai didi

c++ - grpc c++中的异步模型

转载 作者:行者123 更新时间:2023-12-01 12:07:25 32 4
gpt4 key购买 nike

我的团队正在设计一个具有微服务架构的可扩展解决方案,并计划使用 gRPC 作为层之间的传输通信。我们决定使用异步 grpc 模型。如果我扩展 RPC 方法的数量,示例(greeter_async_server.cc)提供的设计似乎不可行,因为我必须为每个 RPC 方法创建一个新类,并在 HandleRpcs() 中创建它们的对象。像这样。
Pastebin (简短的示例代码)。

   void HandleRpcs() {
new CallDataForRPC1(&service_, cq_.get());
new CallDataForRPC2(&service_, cq_.get());
new CallDataForRPC3(&service, cq_.get());
// so on...
}
它将被硬编码,所有的灵 active 都将丢失。
我有大约 300-400 个 RPC 方法来实现,当我必须处理超过 100K 的 RPC 请求/秒时,拥有 300-400 个类会很麻烦而且效率很低,而且这个解决方案是一个非常糟糕的设计。我无法承受在每个请求上以这种方式创建对象的开销。有人可以为我提供一个解决方法吗?可以异步grpc c++不像它的同步伴侣那么简单?
编辑 :为了让情况更清楚,对于那些可能难以掌握这个异步示例流程的人,我正在写到目前为止我所理解的内容,如果在某处有错误,请纠正我。
在异步 grpc 中,每次我们必须将唯一标签与完成队列绑定(bind),这样当我们轮询时,服务器可以在特定的 RPC 将被客户端命中时将其返回给我们,我们从返回的关于调用类型的唯一标签。 service_->RequestRPC2(&ctx_, &request_, &responder_, cq_, cq_,this);这里我们使用当前对象的地址作为唯一标签。这就像在完成队列上注册我们的 RPC 调用。然后我们在 HandleRPCs() 中进行投票。查看客户端是否命中 RPC,如果是,则 cq_->Next(&tag, &OK)将填充标签。轮询代码片段:
while (true) {
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
因为,我们注册到队列中的唯一标签是 CallData 对象的地址,所以我们可以调用 Proceed() .这对于一个逻辑在 Proceed() 内部的 RPC 来说很好。 .但是每次都有更多的 RPC,我们会将它们全部放在 CallData 中,然后在轮询时,我们将调用唯一的 Proceed()这将包含(比如)RPC1(postgres 调用)、RPC2(mongodb 调用)等的逻辑。这就像在一个函数中编写我的所有程序一样。因此,为避免这种情况,我使用了 GenericCallData virtual void Proceed() 的类并从中生成派生类,每个 RPC 一个类,在自己的 Proceed() 中具有自己的逻辑.这是一个可行的解决方案,但我想避免编写很多类。
我尝试的另一个解决方案是将所有 RPC 功能逻辑排除在 proceed() 之外。并融入自己的功能和维护全局 std::map<long, std::function</*some params*/>> .因此,每当我将带有唯一标签的 RPC 注册到队列中时,我都会存储其相应的逻辑函数(我肯定会将其硬编码到语句中并绑定(bind)所有需要的参数),然后将唯一标签作为键。在投票中,当我得到 &tag我在 map 中查找此键并调用相应的保存函数。现在,还有一个障碍,我必须在函数逻辑中执行此操作:
// pseudo code
void function(reply, responder, context, service)
{
// register this RPC with another unique tag so to serve new incoming request of the same type on the completion queue
service_->RequestRPC1(/*params*/, new_unique_id);
// now again save this new_unique_id and current function into the map, so when tag will be returned we can do lookup
map.emplace(new_unique_id, function);

// now you're free to do your logic
// do your logic
}
您会看到,代码现在已经传播到另一个模块中,并且它是基于 RPC 的。
希望它能清除情况。
我想是否有人可以以更简单的方式实现这种类型的服务器。

最佳答案

这篇文章现在已经很老了,但我还没有看到任何关于这个的答案或例子,所以我将向任何其他读者展示我是如何解决它的。我有大约 30 个 RPC 调用,并且正在寻找一种在添加和删除 RPC 调用时减少占用空间的方法。我花了一些迭代来找出解决它的好方法。
因此,我从 (g)RPC 库中获取 RPC 请求的接口(interface)是接收方需要实现的回调接口(interface)。界面如下所示:

class IRpcRequestHandler
{
public:
virtual ~IRpcRequestHandler() = default;
virtual void onZigbeeOpenNetworkRequest(const smarthome::ZigbeeOpenNetworkRequest& req,
smarthome::Response& res) = 0;
virtual void onZigbeeTouchlinkDeviceRequest(const smarthome::ZigbeeTouchlinkDeviceRequest& req,
smarthome::Response& res) = 0;
...
};
还有一些用于在 gRPC 服务器启动后设置/注册每个 RPC 方法的代码:
void ready() 
{
SETUP_SMARTHOME_CALL("ZigbeeOpenNetwork", // Alias that is used for debug messages
smarthome::Command::AsyncService::RequestZigbeeOpenNetwork, // Generated gRPC service method for async.
smarthome::ZigbeeOpenNetworkRequest, // Generated gRPC service request message
smarthome::Response, // Generated gRPC service response message
IRpcRequestHandler::onZigbeeOpenNetworkRequest); // The callback method to call when request has arrived.

SETUP_SMARTHOME_CALL("ZigbeeTouchlinkDevice",
smarthome::Command::AsyncService::RequestZigbeeTouchlinkDevice,
smarthome::ZigbeeTouchlinkDeviceRequest,
smarthome::Response,
IRpcRequestHandler::onZigbeeTouchlinkDeviceRequest);
...
}
这就是您在添加和删除 RPC 方法时需要关心的全部内容。
SETUP_SMARTHOME_CALL 是一个自制的宏,如下所示:
#define SETUP_SMARTHOME_CALL(ALIAS, SERVICE, REQ, RES, CALLBACK_FUNC) \
new ServerCallData<REQ, RES>( \
ALIAS, \
std::bind(&SERVICE, \
&mCommandService, \
std::placeholders::_1, \
std::placeholders::_2, \
std::placeholders::_3, \
std::placeholders::_4, \
std::placeholders::_5, \
std::placeholders::_6), \
mCompletionQueue.get(), \
std::bind(&CALLBACK_FUNC, requestHandler, std::placeholders::_1, std::placeholders::_2))
我认为 ServerCallData 类看起来像 gRPC 示例中的一个,但做了一些修改。 ServerCallData 派生自具有抽象函数的非模板类 void proceed(bool ok)用于 CompletionQueue::Next() 处理。当创建 ServerCallData 时,它将调用 SERVICE在 CompletionQueue 和每个第一个 proceed(ok) 上注册自己的方法调用,它将克隆自己,这将注册另一个实例。如果有人感兴趣,我也可以为此发布一些示例代码。
编辑:在下面添加了更多示例代码。
Grpc服务器
class GrpcServer
{
public:
explicit GrpcServer(std::vector<grpc::Service*> services);
virtual ~GrpcServer();

void run(const std::string& sslKey,
const std::string& sslCert,
const std::string& password,
const std::string& listenAddr,
uint32_t port,
uint32_t threads = 1);

private:
virtual void ready(); // Called after gRPC server is created and before polling CQ.
void handleRpcs(); // Function that polls from CQ, can be run by multiple threads. Casts object to CallData and calls CallData::proceed().

std::unique_ptr<ServerCompletionQueue> mCompletionQueue;
std::unique_ptr<Server> mServer;
std::vector<grpc::Service*> mServices;
std::list<std::shared_ptr<std::thread>> mThreads;
...
}
CallData的主要部分目的:
template <typename TREQUEST, typename TREPLY>
class ServerCallData : public ServerCallMethod
{
public:
explicit ServerCallData(const std::string& methodName,
std::function<void(ServerContext*,
TREQUEST*,
::grpc::ServerAsyncResponseWriter<TREPLY>*,
::grpc::CompletionQueue*,
::grpc::ServerCompletionQueue*,
void*)> serviceFunc,
grpc::ServerCompletionQueue* completionQueue,
std::function<void(const TREQUEST&, TREPLY&)> callback,
bool first = false)
: ServerCallMethod(methodName),
mResponder(&mContext),
serviceFunc(serviceFunc),
completionQueue(completionQueue),
callback(callback)
{
requestNewCall();
}

void proceed(bool ok) override
{
if (!ok)
{
delete this;
return;
}

if (callStatus() == ServerCallMethod::PROCESS)
{
callStatus() = ServerCallMethod::FINISH;
new ServerCallData<TREQUEST, TREPLY>(callMethodName(), serviceFunc, completionQueue, callback);

try
{
callback(mRequest, mReply);
}
catch (const std::exception& e)
{
mResponder.Finish(mReply, Status::CANCELLED, this);
return;
}

mResponder.Finish(mReply, Status::OK, this);
}
else
{
delete this;
}
}

private:
void requestNewCall()
{
serviceFunc(
&mContext, &mRequest, &mResponder, completionQueue, completionQueue, this);
}

ServerContext mContext;
TREQUEST mRequest;
TREPLY mReply;
ServerAsyncResponseWriter<TREPLY> mResponder;
std::function<void(ServerContext*,
TREQUEST*,
::grpc::ServerAsyncResponseWriter<TREPLY>*,
::grpc::CompletionQueue*,
::grpc::ServerCompletionQueue*,
void*)>
serviceFunc;
std::function<void(const TREQUEST&, TREPLY&)> callback;
grpc::ServerCompletionQueue* completionQueue;
};

关于c++ - grpc c++中的异步模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49318889/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com