gpt4 book ai didi

c++ - librdkafka 程序无错退出

转载 作者:行者123 更新时间:2023-11-28 04:45:27 28 4
gpt4 key购买 nike

我有一个在主线程上运行的生产者和一个在其自己的线程 (std::thread) 上运行的消费者。我有一个简单的程序,它使用 Producer 发送一条消息,然后在尝试发送另一条消息之前让主线程进入休眠状态。

只要我的主线程进入休眠状态,程序就会存在。无一异常(exception)。当我尝试正确停止并删除我的消费者/生产者时,也会发生同样的事情。很明显,我做错了什么,但我不知道是什么,因为我的程序没有出现任何错误。我看到的最后一条日志消息是我在让主线程进入休眠状态之前打印的消息。

我已将 try-catch 放入主线程和我的消费者线程中。我还调用了 std::set_terminate 并在其中添加了日志记录。当我的程序退出 try-catch 或终止捕获任何东西时。

有什么建议吗?

更新 #1 [来源]

正如 Sid S 指出的那样,我错过了明显的来源。

主.cc

int main(int argc, char** argv) {
std::cout << "% Main started." << std::endl;

std::set_terminate([](){
std::cerr << "% Terminate occurred in main." << std::endl;
abort();
});

try {
using com::anya::core::networking::KafkaMessenger;
using com::anya::core::common::MessengerCode;
KafkaMessenger messenger;

auto promise = std::promise<bool>();
auto future = promise.get_future();
messenger.Connect([&promise](MessengerCode code, std::string& message) {
promise.set_value(true);
});
future.get();

std::cout << "% Main connection successful." << std::endl;

// Produce 5 messages 5 seconds apart.
int number_of_messages_sent = 0;
while (number_of_messages_sent < 5) {
std::stringstream message;
message << "message-" << number_of_messages_sent;

auto message_send_promise = std::promise<bool>();
auto message_send_future = message_send_promise.get_future();
messenger.SendMessage(message.str(), [&message_send_promise](MessengerCode code) {
std::cout << "% Main message sent" << std::endl;
message_send_promise.set_value(true);
});
message_send_future.get();

number_of_messages_sent++;
std::cout << "% Main going to sleep for 5 seconds." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
}

// Disconnect from Kafka and cleanup.
auto disconnect_promise = std::promise<bool>();
auto disconnect_future = disconnect_promise.get_future();
messenger.Disconnect([&disconnect_promise](MessengerCode code, std::string& message) {
disconnect_promise.set_value(true);
});
disconnect_future.get();
std::cout << "% Main disconnect complete." << std::endl;
} catch (std::exception& exception) {
std::cerr << "% Exception caught in main with error: " << exception.what() << std::endl;
exit(1);
}

std::cout << "% Main exited." << std::endl;
exit(0);
}

KafkaMessenger.cc [消费者部分]

void KafkaMessenger::Connect(std::function<void(MessengerCode , std::string&)> impl) {
assert(!running_.load());
running_.store(true);

// For the sake of brevity I've removed a whole bunch of Kafka configuration setup from the sample code.

RdKafka::ErrorCode consumer_response = consumer_->start(topic_for_consumer, 0, RdKafka::Topic::OFFSET_BEGINNING);

if (consumer_response != RdKafka::ERR_NO_ERROR) {
running_.store(false);
delete consumer_;
delete producer_;

error = RdKafka::err2str(consumer_response);
impl(MessengerCode::CONNECT_FAILED, error);
}

auto consumer_thread_started_promise = std::promise<bool>();
auto consumer_thread_started_future = consumer_thread_started_promise.get_future();
consumer_thread_ = std::thread([this, &topic_for_consumer, &consumer_thread_started_promise]() {
try {
std::cout << "% Consumer thread started." << std ::endl;
consumer_thread_started_promise.set_value(true);

while (running_.load()) {
RdKafka::Message* message = consumer_->consume(topic_for_consumer, 0, 5000);

switch (message->err()) {
case RdKafka::ERR_NO_ERROR: {
std::string message_string((char*) message->payload());
std::cout << "% Consumer received message: " << message_string << std::endl;
delete message;
break;
}
default:
std::cerr << "% Consumer consumption failed: " << message->errstr() << " error code=" << message->err() << std::endl;
break;
}
}

std::cout << "% Consumer shutting down." << std::endl;
if (consumer_->stop(topic_for_consumer, 0) != RdKafka::ERR_NO_ERROR) {
std::cerr << "% Consumer error while trying to stop." << std::endl;
}
} catch (std::exception& exception) {
std::cerr << "% Caught exception in consumer thread: " << exception.what() << std::endl;
}
});

consumer_thread_started_future.get();
std::string message("Consumer connected");
impl(MessengerCode::CONNECT_SUCCESS, message);
}

KafkaMessenger.cc [生产者部分]

void KafkaMessenger::SendMessage(std::string message, std::function<void(MessengerCode)> impl) {
assert(running_.load());
std::cout << "% Producer sending message." << std::endl;

RdKafka::ErrorCode producer_response = producer_->produce(
producer_topic_,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
static_cast<void*>(&message), message.length(), nullptr, nullptr);

switch (producer_response) {
case RdKafka::ERR_NO_ERROR: {
std::cout << "% Producer Successfully sent (" << message.length() << " bytes)" << std::endl;
impl(MessengerCode::MESSAGE_SEND_SUCCESS);
break;
}
case RdKafka::ERR__QUEUE_FULL: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
case RdKafka::ERR__UNKNOWN_PARTITION: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
case RdKafka::ERR__UNKNOWN_TOPIC: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
default: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
}
}

输出当我运行 main 方法时,这是我在控制台中看到的输出。

% Main started.
% Consumer thread started.
% Main connection successful.
% Producer sending message.
% Producer Successfully sent (9 bytes)
% Main message sent
% Main going to sleep for 5 seconds.
% Consumer received message: message-

经过仔细检查,我不认为 sleep 是造成这种情况的原因,因为当我移除 sleep 时,这种情况仍然会发生。正如您在最后一行日志中看到的那样,消费者打印它收到的消息并截断了最后一个字符。有效负载应读取 message-0。所以某个地方的某些东西正在消亡。

更新 #2 [堆栈跟踪]

我发现了这个古老但非常有用的 post关于捕获信号并打印出堆栈。我实现了这个解决方案,现在我可以看到有关崩溃位置的更多信息。

Error: signal 11:
0 main 0x00000001012e4eec _ZN3com4anya4core10networking7handlerEi + 28
1 libsystem_platform.dylib 0x00007fff60511f5a _sigtramp + 26
2 ??? 0x0000000000000000 0x0 + 0
3 main 0x00000001012f2866 rd_kafka_poll_cb + 838
4 main 0x0000000101315fee rd_kafka_q_serve + 590
5 main 0x00000001012f5d46 rd_kafka_flush + 182
6 main 0x00000001012e7f1a _ZN3com4anya4core10networking14KafkaMessenger10DisconnectENSt3__18functionIFvNS1_6common13MessengerCodeENS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEEEEE + 218
7 main 0x00000001012dbc45 main + 3221
8 libdyld.dylib 0x00007fff60290115 start + 1
9 ??? 0x0000000000000001 0x0 + 1

作为我的关闭方法的一部分,我调用了 producer_->flush(1000),这会导致产生堆栈跟踪。如果我删除它,那么关机就可以了。很明显,我错误地配置了一些东西,然后在我尝试刷新时导致了这个段错误。

更新 #3 [解决方案]

事实证明,我处理 Kafka 事件和交付报告的日志记录的类被限定在一个方法中。这是一个问题,因为 librdkafka 库通过引用获取这些对象,所以当我的主运行程序方法退出并开始清理时,这些对象消失了。我将记录器的范围限定在类级别,这解决了崩溃问题。

最佳答案

Kafka 消息有效负载只是二进制数据,除非您发送一个带有尾随空字节的字符串,否则它不会包含这样的空字节,这会导致您的 std::string 构造函数读入相邻内存以寻找空字节,可能访问未映射的内存,这将导致您的应用程序崩溃,或者至少使您的终端乱码。

使用消息长度结合payload来构造一个限制为实际字节数的std::string,打印起来仍然不安全,但它是一个开始:

std::string message_string((char*) message->payload(), message->len());

关于c++ - librdkafka 程序无错退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49354496/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com