在 C++ 中将序列化的 Thrift 结构序列化到 Kafka

Serializing a serialized Thrift struct to Kafka in C++

我在Thrift中定义了一组structs,如下所示:

struct Foo {
  1: i32 a,
  2: i64 b
}

我需要在 C++ 中执行以下操作:

(a) 将 Foo 的实例序列化为与 Thrift 兼容的字节(使用 BinaryCompact Thrift 协议)

(b) 将字节序列化实例发送到 Kafka 主题

问题

如何将 Thrift 序列化实例发送到 Kafka 集群?

提前致谢

找出我自己问题的答案。

序列化

下面的代码片段说明了如何将 Foo 的实例序列化为 Thrift 兼容字节(使用 Thrift Compact 协议)。为了使用 Binary 协议,将 TCompactProtocol 替换为 TBinaryProtocol

#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TCompactProtocol.h>

using apache::thrift::protocol::TCompactProtocol;
using apache::thrift::transport::TMemoryBuffer;

...
...
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer));
uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *)));
uint32_t num_bytes = 0;

// 'foo' is an instance of Foo
foo->write(protocol.get());
buffer->getBuffer(serialized_bytes, &num_bytes);

发送到 Kafka 集群

以下代码片段说明了如何将与 Thrift 兼容的字节发送到 Kafka 集群。

注意:下面使用的kafka客户端库是librdkafka.

#include "rdkafkacpp.h"

std::string errstr;

// Create global configuration
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", "localhost:9092", errstr);
conf->set("api.version.request", "true", errstr);

// Create kafka producer
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);

// Create topic-specific configuration
RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr);

auto partition = 1;

// Sending the serialized bytes to Kafka cluster
auto res = producer->produce(
    topic, partition,
    RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
    serialized_bytes, num_bytes,
    NULL, NULL);

  if (res != RdKafka::ERR_NO_ERROR) {
    std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl;
  } else {
    std::cout << "Published message of " << num_bytes << " bytes" << std::endl;
  }

producer->flush(10000);