gpt4 book ai didi

c++ - gRPC:长时间运行的流式传输的最佳实践是什么?

转载 作者:行者123 更新时间:2023-12-04 08:59:13 47 4
gpt4 key购买 nike

我们已经实现了一个在云中运行的 Java gRPC 服务,具有单向(客户端到服务器)流式 RPC,如下所示:

rpc PushUpdates(stream Update) returns (Ack);

C++ 客户端(移动设备)在启动后立即调用此 rpc,每 30 秒左右连续发送一次更新,只要设备启动并运行,就会永久发送。

ChannelArguments chan_args;
// this will be secure channel eventually
auto channel_p = CreateCustomChannel(remote_addr, InsecureChannelCredentials(), chan_args);
auto stub_p = DialTcc::NewStub(channel_p);
// ...

Ack ack;
auto strm_ctxt_p = make_unique<ClientContext>();
auto strm_p = stub_p->PushUpdates(strm_ctxt_p.get(), &ack);
// ...

While(true) {
// wait until we are ready to send a new update
Update updt;
// populate updt;
if(!strm_p->Write(updt)) {
// stream is not kosher, create a new one and restart
break;
}
}

现在发生这种情况时会发生不同类型的网络中断:

  • 在云中运行的 gRPC 服务可能会停机(进行维护)或者可能只是变得无法访问。
  • 设备本身的 ip 地址不断变化,因为它是移动设备。

我们已经看到,在此类事件中, channel 和 Write() API 都无法可靠地检测到网络断开。有时客户端不断调用 Write()(它不返回 false)但服务器没有收到任何数据(wireshark 没有显示任何事件)客户端设备的传出端口)。

在这种情况下恢复的最佳做法是什么,以便服务器在此类事件发生后 X 秒 内开始接收更新?可以理解的是,每当发生此类事件时,都会丢失 X 秒 有值(value)的数据,但我们希望在 X 秒内可靠地恢复。

gRPC 版本:1.30.2,客户端:C++-14/Linux,服务器:Java/Linux

最佳答案

以下是我们破解它的方法。我想检查这是否可以做得更好,或者 gRPC 的任何人都可以指导我找到更好的解决方案。

我们服务的 protobuf 看起来像这样。它有一个用于 ping 服务的 RPC,经常用于测试连接。

// Message used in IsAlive RPC
message Empty {}

// Acknowledgement sent by the service for updates received
message UpdateAck {}

// Messages streamed to the service by the client
message Update {
...
...
}

service GrpcService {
// for checking if we're able to connect
rpc Ping(Empty) returns (Empty);

// streaming RPC for pushing updates by client
rpc PushUpdate(stream Update) returns (UpdateAck);
}

这是 c++ 客户端的外观,它执行以下操作:

  • 连接():

    • 如果 stub 是 nullptr,则创建用于调用 RPC 的 stub 。
    • 定期调用 Ping() 直到成功。
    • 成功调用 PushUpdate(...) RPC 以创建新流。
    • 失败时将流重置为 nullptr
  • Stream():执行以下 while(true) 循环:

    • 获取要推送的更新。
    • 在要推送更新的流上调用 Write(...)
    • 如果 Write(...) 由于任何原因中断并且控制返回到 Connect()
    • 每 30 分钟(或某个固定时间间隔)一次,将所有内容( stub 、 channel 、流)重置为 nullptr 以重新开始。这是必需的,因为有时即使客户端和服务之间没有连接,Write(...) 也不会失败。 Write(...) 调用成功,但客户端上的传出端口未显示 wireshark 上的任何事件!

代码如下:

constexpr GRPC_TIMEOUT_S = 10;
constexpr RESTART_INTERVAL_M = 15;
constexpr GRPC_KEEPALIVE_TIME_MS = 10000;
string root_ca, tls_key, tls_cert; // for SSL
string remote_addr = "https://remote.com:5445";
...
...
void ResetStreaming() {
if (stub_p) {
if (strm_p) { // graceful restart/stop, this pair of API are called together, in this order
if (!strm_p->WritesDone()) {
// Log a message
}
strm_p->Finish(); // Log if return value of this is NOT grpc::OK
}
strm_p = nullptr;
strm_ctxt_p = nullptr;
stub_p = nullptr;
channel_p = nullptr;
}
}

void CreateStub() {
if (!stub_p) {
ChannelArguments chan_args;
chan_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, GRPC_KEEPALIVE_TIME_MS);
channel_p = CreateCustomChannel(
remote_addr,
SslCredentials(SslCredentialsOptions{root_ca, tls_key, tls_cert}),
chan_args);
stub_p = GrpcService::NewStub(m_channel_p);
}
}

void Stream() {
const auto restart_time = steady_clock::now() + minutes(RESTART_INTERVAL_M);
while (!stop) {
// restart every RESTART_INTERVAL_M (15m) even if ALL IS WELL!!
if (steady_clock::now() > restart_time) {
break;
}
Update updt = GetUpdate(); // get the update to be sent
if (!stop) {
if (channel_p->GetState(true) == GRPC_CHANNEL_SHUTDOWN ||
!strm_p->Write(updt)) {
// could not write!!
return; // we will Connect() again
}
}
}
// stopped due to stop = true or interval to create new stream has expired
ResetStreaming(); // channel, stub, stream are recreated once in every 15m
}

bool PingRemote() {
ClientContext ctxt;
ctxt.set_deadline(system_clock::now() + seconds(GRPC_TIMEOUT_S));
Empty req, resp;
CreateStub();
if (stub_p->Ping(&ctxt, req, &resp).ok()) {
static UpdateAck ack;
strm_ctxt_p = make_unique<ClientContext>(); // need new context
strm_p = stub_p->PushUpdate(strm_ctxt_p.get(), &ack);
return true;
}
if (strm_p) {
strm_p = nullptr;
strm_ctxt_p = nullptr;
}
return false;
}

void Connect() {
while (!stop) {
if (PingRemote() || stop) {
break;
}
sleep_for(seconds(5)); // wait before retrying
}
}

// set to true from another thread when we want to stop
atomic<bool> stop = false;

void StreamUntilStopped() {
if (stop) {
return;
}
strm_thread_p = make_unique<thread>([&] {
while (!stop) {
Connect();
Stream();
}
});
}

// called by the thread that sets stop = true
void Finish() {
strm_thread_p->join();
}

据此,我们发现无论何时因任何原因出现中断,流媒体都会在 15 分钟(或RESTART_INTERVAL_M)内恢复。此代码以快速路径运行,所以我很想知道是否可以做得更好。

关于c++ - gRPC:长时间运行的流式传输的最佳实践是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63633332/

47 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com