gpt4 book ai didi

c++ - 我们简单的 GRPC 示例程序中的 RPC 失败代码 14

转载 作者:太空宇宙 更新时间:2023-11-04 05:18:38 26 4
gpt4 key购买 nike

我们在让 GRPC 在 RHEL 7 下运行方面取得了良好进展。
我们的应用程序有一个相当复杂的结构,具有三层嵌套,外层实现了“oneof”关键字。
我们发现所有其他结构都运行良好,但这个结构给我们带来了 RPC 失败,代码=14。
我们已经尽可能地简化了应用程序的这一部分,因此它有望能够轻松地重新编译和运行。

这是 .proto 文件,已更新以适应 Uli 的问题:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.debug";
option java_outer_classname = "DebugProto";
option objc_class_prefix = "DEBUG";

package DEBUGpackage;

service DEBUGservice {
rpc DEBUG_val_container_get (input_int32_request) returns (outer_container) {}
}

message input_int32_request {
int32 ival = 1;
}

message inner_container {
repeated uint32 val_array = 1;
}

message middle_container {
inner_container vac = 1;
}

message other_container {
int32 other_val = 1;
}

message outer_container {
oneof reply {
middle_container r1 = 1;
other_container r2 = 2;
}
}

(请注意,这个原型(prototype)代码中的 java 行就在那里,因为它们在 GRPC 网站示例中。我们的代码完全是 C++,没有 java。不知道这是否意味着我们可以不需要这些“option java...”行)。

这是我们的客户端源代码:

#include <iostream>
#include <memory>
#include <string>

#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include <thread>
#include <unistd.h>

#include "debug.grpc.pb.h"

using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using DEBUGpackage::input_int32_request;
using DEBUGpackage::inner_container;
using DEBUGpackage::middle_container;
using DEBUGpackage::outer_container;
using DEBUGpackage::DEBUGservice;

class DEBUGClient {
public:

explicit DEBUGClient(std::shared_ptr<Channel> channel)
: stub_(DEBUGservice::NewStub(channel)) {}

void DEBUG_val_container_get() {
std::cout << "in DEBUG_val_container_get" << std::endl;
// Data we are sending to the server
input_int32_request val;
val.set_ival(0);
AsyncClientCall* call = new AsyncClientCall;
call->response_reader = stub_->AsyncDEBUG_val_container_get(&call->context, val, &cq_);
call->response_reader->Finish(&call->reply_, &call->status, (void*)call);

}

void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;

while (cq_.Next(&got_tag, &ok)) {
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
GPR_ASSERT(ok);
if (call->status.ok()) {
if (call->reply_.has_r1()) {
std::cout << call << " DEBUG received: "
<< call->reply_.r1().vac().val_array(0) << std::endl;
}
}
else {
std::cout << call << " RPC failed" << std::endl;
std::cout << " RPC failure code = " << call->status.error_code() << std::endl;
std::cout << " RPC failure message = " << call->status.error_message() << std::endl;
}
delete call;
}
}

private:
struct AsyncClientCall {
outer_container reply_;
ClientContext context;
Status status;
std::unique_ptr<ClientAsyncResponseReader<outer_container>> response_reader;
};

std::unique_ptr<DEBUGservice::Stub> stub_;
CompletionQueue cq_;
};

int main(int argc, char** argv) {
DEBUGClient DEBUG0(grpc::CreateChannel("172.16.17.46:50050", grpc::InsecureChannelCredentials()));
std::thread thread0_ = std::thread(&DEBUGClient::AsyncCompleteRpc, &DEBUG0);
DEBUG0.DEBUG_val_container_get();
sleep(1);
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread0_.join(); //blocks forever
return 0;
}

而且,这是我们的服务器源代码:

#include <memory>
#include <iostream>
#include <string>
#include <thread>

#include <grpc++/grpc++.h>
#include <grpc/support/log.h>

#include "debug.grpc.pb.h"

#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>

using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerCompletionQueue;
using grpc::Status;
using DEBUGpackage::inner_container;
using DEBUGpackage::input_int32_request;
using DEBUGpackage::middle_container;
using DEBUGpackage::outer_container;
using DEBUGpackage::DEBUGservice;

std::string save_server_address;

class ServerImpl final {

public:

~ServerImpl() {
server_->Shutdown();
cq_->Shutdown();
}

void Run() {
std::string server_address("0.0.0.0:50050");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
save_server_address = server_address;
HandleRpcs();
}

private:

class CallData {
public:
virtual void Proceed() = 0;
};

class DebugGetCallData final : public CallData{

public:

DebugGetCallData(DEBUGservice::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
status_ = PROCESS;
service_->RequestDEBUG_val_container_get(&ctx_, &request_, &responder_, cq_, cq_, this);
} else if (status_ == PROCESS) {
new DebugGetCallData(service_, cq_);
char *portchar;
portchar = (char *) save_server_address.c_str();
long cq_addr = (long) cq_;
int cq_addr32 = (int) (cq_addr & 0xfffffff);
srand(cq_addr32);
fprintf(stderr, "%s task started\n", portchar); fflush(stderr);
unsigned int return_val = 10;
inner_container ic;
ic.add_val_array(return_val);
middle_container reply_temp;
reply_temp.set_allocated_vac(&ic);
reply_.set_allocated_r1(&reply_temp);
fprintf(stderr, "%s %s task done\n", portchar, "val_container_get"); fflush(stderr);
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
GPR_ASSERT(status_ == FINISH);
}
}

private:

DEBUGservice::AsyncService* service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
input_int32_request request_;
outer_container reply_;
ServerAsyncResponseWriter<outer_container> responder_;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
};

void HandleRpcs() {
new DebugGetCallData(&service_, cq_.get());
void* tag;
bool ok;
while (true) {
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}

std::unique_ptr<ServerCompletionQueue> cq_;
DEBUGservice::AsyncService service_;
std::unique_ptr<Server> server_;
};

int main() {
ServerImpl server;
server.Run();
return 0;
}

运行时的输出如下所示:

[fossum@netsres46 debug]$ DEBUG_client2
in DEBUG_val_container_get
0xb73ff0 RPC failed
RPC failure code = 14
RPC failure message = Endpoint read failed
Press control-c to quit

我们在gdb下运行服务器,并在生成的文件中找到一个位置文件“debug.pb.cc”,如果我们只注释掉一行,一切都会开始工作。

这是生成的文件“debug.pb.cc”的相关部分:

middle_container::~middle_container() {
// @@protoc_insertion_point(destructor:DEBUGpackage.middle_container)
SharedDtor();
}

void middle_container::SharedDtor() {
if (this != internal_default_instance()) {
delete vac_; // comment out this one line, to make the problem go away
}
}

“delete vac_”行似乎是尝试删除已删除或即将在其他位置删除的存储。请问有人可以调查一下吗? [下面的文件仍然是我们用来生成此代码以及调试问题的文件]

我不知道我是否发现了 GRPC 中的错误,或者我是否编写了错误的代码。

最佳答案

问题是您在服务器的堆栈上分配了middle_containerreply_tmp。结果,一旦你超出范围,它就会被破坏。此时,您已经调用了Finish,但尚未等待其结果。由于这是一个异步服务器,因此数据必须保持事件状态,直到您收到返回的标签为止。这就是为什么手动编辑析构函数适用于您的情况;您基本上使析构函数无效(并因此泄漏内存)。

关于c++ - 我们简单的 GRPC 示例程序中的 RPC 失败代码 14,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45313159/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com