gpt4 book ai didi

c++ - 在 C++ 中将序列化的 Thrift 结构序列化到 Kafka

转载 作者:太空狗 更新时间:2023-10-29 20:53:25 26 4
gpt4 key购买 nike

我在 Thrift 中定义了一组 structs,如下所示:

struct Foo {
1: i32 a,
2: i64 b
}

我需要在 C++ 中执行以下操作:

(a) 将 Foo 的实例序列化为与 Thrift 兼容的字节(使用 BinaryCompact Thrift 协议(protocol))

(b) 将字节序列化实例发送到 Kafka 主题

问题

如何将 Thrift 序列化实例发送到 Kafka 集群?

提前致谢

最佳答案

找出我自己问题的答案。

序列化

下面的代码片段说明了如何将 Foo 的实例序列化为 Thrift 兼容的字节(使用 Thrift Compact 协议(protocol))。为了使用 Binary 协议(protocol),将 TCompactProtocol 替换为 TBinaryProtocol

#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TCompactProtocol.h>

using apache::thrift::protocol::TCompactProtocol;
using apache::thrift::transport::TMemoryBuffer;

...
...
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer));
uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *)));
uint32_t num_bytes = 0;

// 'foo' is an instance of Foo
foo->write(protocol.get());
buffer->getBuffer(serialized_bytes, &num_bytes);

发送到Kafka集群

以下代码片段说明了如何将与 Thrift 兼容的字节发送到 Kafka 集群。

注意:下面使用的kafka客户端库是librdkafka .

#include "rdkafkacpp.h"

std::string errstr;

// Create global configuration
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", "localhost:9092", errstr);
conf->set("api.version.request", "true", errstr);

// Create kafka producer
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);

// Create topic-specific configuration
RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr);

auto partition = 1;

// Sending the serialized bytes to Kafka cluster
auto res = producer->produce(
topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
serialized_bytes, num_bytes,
NULL, NULL);

if (res != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl;
} else {
std::cout << "Published message of " << num_bytes << " bytes" << std::endl;
}

producer->flush(10000);

关于c++ - 在 C++ 中将序列化的 Thrift 结构序列化到 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42871143/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com