- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个在主线程上运行的生产者和一个在其自己的线程 (std::thread) 上运行的消费者。我有一个简单的程序,它使用 Producer 发送一条消息,然后在尝试发送另一条消息之前让主线程进入休眠状态。
只要我的主线程进入休眠状态,程序就会存在。无一异常(exception)。当我尝试正确停止并删除我的消费者/生产者时,也会发生同样的事情。很明显,我做错了什么,但我不知道是什么,因为我的程序没有出现任何错误。我看到的最后一条日志消息是我在让主线程进入休眠状态之前打印的消息。
我已将 try-catch 放入主线程和我的消费者线程中。我还调用了 std::set_terminate 并在其中添加了日志记录。当我的程序退出 try-catch 或终止捕获任何东西时。
有什么建议吗?
更新 #1 [来源]
正如 Sid S 指出的那样,我错过了明显的来源。
主.cc
int main(int argc, char** argv) {
std::cout << "% Main started." << std::endl;
std::set_terminate([](){
std::cerr << "% Terminate occurred in main." << std::endl;
abort();
});
try {
using com::anya::core::networking::KafkaMessenger;
using com::anya::core::common::MessengerCode;
KafkaMessenger messenger;
auto promise = std::promise<bool>();
auto future = promise.get_future();
messenger.Connect([&promise](MessengerCode code, std::string& message) {
promise.set_value(true);
});
future.get();
std::cout << "% Main connection successful." << std::endl;
// Produce 5 messages 5 seconds apart.
int number_of_messages_sent = 0;
while (number_of_messages_sent < 5) {
std::stringstream message;
message << "message-" << number_of_messages_sent;
auto message_send_promise = std::promise<bool>();
auto message_send_future = message_send_promise.get_future();
messenger.SendMessage(message.str(), [&message_send_promise](MessengerCode code) {
std::cout << "% Main message sent" << std::endl;
message_send_promise.set_value(true);
});
message_send_future.get();
number_of_messages_sent++;
std::cout << "% Main going to sleep for 5 seconds." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
}
// Disconnect from Kafka and cleanup.
auto disconnect_promise = std::promise<bool>();
auto disconnect_future = disconnect_promise.get_future();
messenger.Disconnect([&disconnect_promise](MessengerCode code, std::string& message) {
disconnect_promise.set_value(true);
});
disconnect_future.get();
std::cout << "% Main disconnect complete." << std::endl;
} catch (std::exception& exception) {
std::cerr << "% Exception caught in main with error: " << exception.what() << std::endl;
exit(1);
}
std::cout << "% Main exited." << std::endl;
exit(0);
}
KafkaMessenger.cc [消费者部分]
void KafkaMessenger::Connect(std::function<void(MessengerCode , std::string&)> impl) {
assert(!running_.load());
running_.store(true);
// For the sake of brevity I've removed a whole bunch of Kafka configuration setup from the sample code.
RdKafka::ErrorCode consumer_response = consumer_->start(topic_for_consumer, 0, RdKafka::Topic::OFFSET_BEGINNING);
if (consumer_response != RdKafka::ERR_NO_ERROR) {
running_.store(false);
delete consumer_;
delete producer_;
error = RdKafka::err2str(consumer_response);
impl(MessengerCode::CONNECT_FAILED, error);
}
auto consumer_thread_started_promise = std::promise<bool>();
auto consumer_thread_started_future = consumer_thread_started_promise.get_future();
consumer_thread_ = std::thread([this, &topic_for_consumer, &consumer_thread_started_promise]() {
try {
std::cout << "% Consumer thread started." << std ::endl;
consumer_thread_started_promise.set_value(true);
while (running_.load()) {
RdKafka::Message* message = consumer_->consume(topic_for_consumer, 0, 5000);
switch (message->err()) {
case RdKafka::ERR_NO_ERROR: {
std::string message_string((char*) message->payload());
std::cout << "% Consumer received message: " << message_string << std::endl;
delete message;
break;
}
default:
std::cerr << "% Consumer consumption failed: " << message->errstr() << " error code=" << message->err() << std::endl;
break;
}
}
std::cout << "% Consumer shutting down." << std::endl;
if (consumer_->stop(topic_for_consumer, 0) != RdKafka::ERR_NO_ERROR) {
std::cerr << "% Consumer error while trying to stop." << std::endl;
}
} catch (std::exception& exception) {
std::cerr << "% Caught exception in consumer thread: " << exception.what() << std::endl;
}
});
consumer_thread_started_future.get();
std::string message("Consumer connected");
impl(MessengerCode::CONNECT_SUCCESS, message);
}
KafkaMessenger.cc [生产者部分]
void KafkaMessenger::SendMessage(std::string message, std::function<void(MessengerCode)> impl) {
assert(running_.load());
std::cout << "% Producer sending message." << std::endl;
RdKafka::ErrorCode producer_response = producer_->produce(
producer_topic_,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
static_cast<void*>(&message), message.length(), nullptr, nullptr);
switch (producer_response) {
case RdKafka::ERR_NO_ERROR: {
std::cout << "% Producer Successfully sent (" << message.length() << " bytes)" << std::endl;
impl(MessengerCode::MESSAGE_SEND_SUCCESS);
break;
}
case RdKafka::ERR__QUEUE_FULL: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
case RdKafka::ERR__UNKNOWN_PARTITION: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
case RdKafka::ERR__UNKNOWN_TOPIC: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
default: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
}
}
输出当我运行 main 方法时,这是我在控制台中看到的输出。
% Main started.
% Consumer thread started.
% Main connection successful.
% Producer sending message.
% Producer Successfully sent (9 bytes)
% Main message sent
% Main going to sleep for 5 seconds.
% Consumer received message: message-
经过仔细检查,我不认为 sleep 是造成这种情况的原因,因为当我移除 sleep 时,这种情况仍然会发生。正如您在最后一行日志中看到的那样,消费者打印它收到的消息并截断了最后一个字符。有效负载应读取 message-0。所以某个地方的某些东西正在消亡。
更新 #2 [堆栈跟踪]
我发现了这个古老但非常有用的 post关于捕获信号并打印出堆栈。我实现了这个解决方案,现在我可以看到有关崩溃位置的更多信息。
Error: signal 11:
0 main 0x00000001012e4eec _ZN3com4anya4core10networking7handlerEi + 28
1 libsystem_platform.dylib 0x00007fff60511f5a _sigtramp + 26
2 ??? 0x0000000000000000 0x0 + 0
3 main 0x00000001012f2866 rd_kafka_poll_cb + 838
4 main 0x0000000101315fee rd_kafka_q_serve + 590
5 main 0x00000001012f5d46 rd_kafka_flush + 182
6 main 0x00000001012e7f1a _ZN3com4anya4core10networking14KafkaMessenger10DisconnectENSt3__18functionIFvNS1_6common13MessengerCodeENS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEEEEE + 218
7 main 0x00000001012dbc45 main + 3221
8 libdyld.dylib 0x00007fff60290115 start + 1
9 ??? 0x0000000000000001 0x0 + 1
作为我的关闭方法的一部分,我调用了 producer_->flush(1000),这会导致产生堆栈跟踪。如果我删除它,那么关机就可以了。很明显,我错误地配置了一些东西,然后在我尝试刷新时导致了这个段错误。
更新 #3 [解决方案]
事实证明,我处理 Kafka 事件和交付报告的日志记录的类被限定在一个方法中。这是一个问题,因为 librdkafka 库通过引用获取这些对象,所以当我的主运行程序方法退出并开始清理时,这些对象消失了。我将记录器的范围限定在类级别,这解决了崩溃问题。
最佳答案
Kafka 消息有效负载只是二进制数据,除非您发送一个带有尾随空字节的字符串,否则它不会包含这样的空字节,这会导致您的 std::string 构造函数读入相邻内存以寻找空字节,可能访问未映射的内存,这将导致您的应用程序崩溃,或者至少使您的终端乱码。
使用消息长度结合payload来构造一个限制为实际字节数的std::string,打印起来仍然不安全,但它是一个开始:
std::string message_string((char*) message->payload(), message->len());
关于c++ - librdkafka 程序无错退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49354496/
我使用 librdkafka 库用 C 语言编写了 kafka 消费者和生产者。 Kafka代理版本是kafka_2.12-2.3.0。生产者正在成功生成消息,并且 dr_msg_cb 函数确认已成功
对于 Kafka 来说相对较新,我在实现 Kafka Producer 时遇到了麻烦,它从函数接收输入消息,并进一步根据定义的主题生成它。但困难在于,在主题上发布它需要花费大量时间。如果我尝试更改它的
我有一个在主线程上运行的生产者和一个在其自己的线程 (std::thread) 上运行的消费者。我有一个简单的程序,它使用 Producer 发送一条消息,然后在尝试发送另一条消息之前让主线程进入休眠
我正在运行 rdkafka_simple_producer.c 以向 Kafka 集群生成消息。我有一个主题和 30 个分区。使用默认的循环分区程序。当生产者工作并向 Kafka 生成消息时,我向 K
当我向消费者查询分配的主题分区列表时,结果中的所有分区的偏移量都为-1001。 如果我打印出接收到的消息的偏移量,则偏移量将设置为正确的值。 这是我用来消耗消息的代码: static void pri
我正在尝试在使用 CMake 构建的项目中使用 Kafka。 我安装了 librdkafka-dev(版本 0.8.6-1.1)。 我还尝试从 Git 下载项目并手动构建它,这似乎很成功。 我注意到
大家好,我是 kafka 新手,必须使用 C 语言开发一个项目。 我已经从kafka_2.10-0.8.2.0.tgz安装了代理和zookeeper bin/kafka-server-start.sh
static void DeliverCallback(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t error_co
librdkafka c++ 库是否有用于单元测试的模拟服务器,不需要您启动 kafka 服务器?如果没有,那有没有简单易用的轻量级kafka服务器? librdkafka 库中的示例和单元测试是否需
我在 php 中有一个生产者,它发布到队列。我正在使用 php-rdkafka相同的图书馆。 以下是代码: $conf->set('log_level', LOG_DEBUG);
当我运行我的消费者组时,我总是有这样的统计数据: 2016-10-15 13:56:17.925: "STATS": { "name": "debian-meox#producer-2", "type
我使用 librdkafka 作为客户端消费者,我已经配置代理和客户端以支持 SSL,对于代理: listeners = PLAINTEXT://172.20.54.9:9092,SSL://172.
我正在尝试在 Heroku 上启动并运行一个简单的 Kafka 消费者。我正在使用 Node 8.11 和 node-rdkafka 。我已经在本地运行了。当我尝试在 Heroku 上运行它时,出现以
我正在使用 librdkafka C API 消费者(特别是使用 rd_kafka_consumer_poll 来读取并且我在这之前调用了 rd_kafka_poll_set_consumer) 我看
我在让 confluent-kafka-dotnet 库与 SSL 一起工作时遇到问题。没有 SSL 一切正常,我可以通过使用 kafkas 自己的脚本来让 SSL 工作,如下所示。 > .\kafk
我正在尝试使用 Kafka 发送约 10Mb 的消息。我知道它的默认大小是 1Mb,但这是一个硬性限制吗? librdkafka 可以支持 >10Mb 吗?如何设置? 最佳答案 您需要配置主题 max
我使用 librdkafka .net 客户端开发一个应用程序,该客户端连接到在 linux 操作系统中运行的 kafka 集群。我在下面公开了两个 key 来加密数据:{“安全协议(protocol
请帮助我在 windows xampp 上安装 librdkafka 以进行 php 开发。 PHP : 7.1.12, x86, 线程安全, MSVC14 我从 https://pecl.php.n
我试图在我的 docker 容器上运行来自 alpine 发行版的 librdkafka 1.3.0 版: FROM golang:1.13.6-alpine3.10 as base RUN apk
问题:RdKafka 安装程序未在 Visual Studio 15 中查找/识别 librdkafka 上下文: 为了在 visual studio 15 中开始使用 RdKafka,我运行了通用的
我是一名优秀的程序员,十分优秀!