Flink - 将 pojo 序列化到 Kafka sink
Flink - serialize a pojo to Kafka sink
我的Flink代码结构是:
使用kafka获取数据(topic_1_in)->反序列化消息->映射->操作数据->获取POJO->序列化消息->使用kafka发送数据(topic_1_out)
我现在正处于序列化我的 POJO 的最后阶段。我在 Flink 网站上找到了以下示例:
DataStream<String> stream = ...
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addSink(myProducer);
但我不明白如何实现序列化模式。
我也阅读了不同的可能性:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
但是,对于如何将我的 POJO 转换为字符串以提供给 Kafka 接收器,我还是有点困惑。 class 非常简单,所以我认为这很简单。
public class POJO_block {
public Double id;
public Double tr_p;
public Integer size;
public Double last_info;
public Long millis_last;
private ArrayList<Tuple3<Integer, Integer, Integer>> list_val;
}
任何例子都将不胜感激。
谢谢
问题中提到的link指的是Flink内部序列化,当Flink需要将我们的一些数据从集群的一部分运送到另一部分时使用,但在写入Kafka时无关紧要.
当 Flink 与 Kafka 等外部存储进行交互时,它依赖于 连接器 ,而这样做时序列化的方式也取决于该连接器的配置细节作为底层外部存储的特定机制(例如,在 kafka 记录的情况下,key 和 value 等概念)。
在您描述的情况下,因为您的程序正在使用 DataStream API 并且正在与 Kafka 通信,所以您使用的连接器是 Kafka Datastream API,其文档位于 here.
在您提供的代码中,FlinkKafkaProducer
接收器的这个参数指定了序列化的发生方式:
// this is probably not what you want:
new SimpleStringSchema(), // serialization schema
此配置无效,因为 SimpleStringSchema
需要字符串作为输入,因此 POJO_block
的流将使其失败。
您可以传递 org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
的任何实现,包含一个主要功能,让您定义对应于每个 POJO_block
块实例的 kafka 键和值的字节值(即 T
下面):
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
请注意,如果您使用 Table API 来读取和写入 Kafka 而不是 DataStream API,this connector would be used instead, which has a convenient format 配置具有现成的格式像 csv、json、avro、Debezium...
我的Flink代码结构是: 使用kafka获取数据(topic_1_in)->反序列化消息->映射->操作数据->获取POJO->序列化消息->使用kafka发送数据(topic_1_out)
我现在正处于序列化我的 POJO 的最后阶段。我在 Flink 网站上找到了以下示例:
DataStream<String> stream = ...
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addSink(myProducer);
但我不明白如何实现序列化模式。
我也阅读了不同的可能性:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
但是,对于如何将我的 POJO 转换为字符串以提供给 Kafka 接收器,我还是有点困惑。 class 非常简单,所以我认为这很简单。
public class POJO_block {
public Double id;
public Double tr_p;
public Integer size;
public Double last_info;
public Long millis_last;
private ArrayList<Tuple3<Integer, Integer, Integer>> list_val;
}
任何例子都将不胜感激。
谢谢
问题中提到的link指的是Flink内部序列化,当Flink需要将我们的一些数据从集群的一部分运送到另一部分时使用,但在写入Kafka时无关紧要.
当 Flink 与 Kafka 等外部存储进行交互时,它依赖于 连接器 ,而这样做时序列化的方式也取决于该连接器的配置细节作为底层外部存储的特定机制(例如,在 kafka 记录的情况下,key 和 value 等概念)。
在您描述的情况下,因为您的程序正在使用 DataStream API 并且正在与 Kafka 通信,所以您使用的连接器是 Kafka Datastream API,其文档位于 here.
在您提供的代码中,FlinkKafkaProducer
接收器的这个参数指定了序列化的发生方式:
// this is probably not what you want:
new SimpleStringSchema(), // serialization schema
此配置无效,因为 SimpleStringSchema
需要字符串作为输入,因此 POJO_block
的流将使其失败。
您可以传递 org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
的任何实现,包含一个主要功能,让您定义对应于每个 POJO_block
块实例的 kafka 键和值的字节值(即 T
下面):
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
请注意,如果您使用 Table API 来读取和写入 Kafka 而不是 DataStream API,this connector would be used instead, which has a convenient format 配置具有现成的格式像 csv、json、avro、Debezium...