如何在 Apache Flink 1.14 中将消息密钥添加到 KafkaSink
How can I add message key to KafkaSink in Apache Flink 1.14
如标题所述,我需要在 KafkaSink
中设置自定义 message key
。我在 Apache Flink 1.14
文档中找不到有关如何实现此目的的任何说明。
目前我正在正确设置 KafkaSink
并且 data payload
正确写入 topic
,但 key
是 null
。
有什么建议么?提前致谢
您应该实现一个 KafkaRecordSerializationSchema
,它在 serialize
方法返回的 ProducerRecord
上设置密钥。
您将像这样创建接收器 more-or-less:
KafkaSink<UsageRecord> sink =
KafkaSink.<UsageRecord>builder()
.setBootstrapServers(brokers)
.setKafkaProducerConfig(kafkaProps)
.setRecordSerializer(new MyRecordSerializationSchema(topic))
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("my-record-producer")
.build();
序列化程序将是这样的:
public class MyRecordSerializationSchema implements
KafkaRecordSerializationSchema<T> {
private static final long serialVersionUID = 1L;
private String topic;
private static final ObjectMapper objectMapper =
JsonMapper.builder()
.build()
.registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
public MyRecordSerializationSchema() {}
public MyRecordSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
T element, KafkaSinkContext context, Long timestamp) {
try {
return new ProducerRecord<>(
topic,
null, // choosing not to specify the partition
element.ts.toEpochMilli(),
element.getKey(),
objectMapper.writeValueAsBytes(element));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(
"Could not serialize record: " + element, e);
}
}
}
请注意,此示例还设置了时间戳。
如标题所述,我需要在 KafkaSink
中设置自定义 message key
。我在 Apache Flink 1.14
文档中找不到有关如何实现此目的的任何说明。
目前我正在正确设置 KafkaSink
并且 data payload
正确写入 topic
,但 key
是 null
。
有什么建议么?提前致谢
您应该实现一个 KafkaRecordSerializationSchema
,它在 serialize
方法返回的 ProducerRecord
上设置密钥。
您将像这样创建接收器 more-or-less:
KafkaSink<UsageRecord> sink =
KafkaSink.<UsageRecord>builder()
.setBootstrapServers(brokers)
.setKafkaProducerConfig(kafkaProps)
.setRecordSerializer(new MyRecordSerializationSchema(topic))
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("my-record-producer")
.build();
序列化程序将是这样的:
public class MyRecordSerializationSchema implements
KafkaRecordSerializationSchema<T> {
private static final long serialVersionUID = 1L;
private String topic;
private static final ObjectMapper objectMapper =
JsonMapper.builder()
.build()
.registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
public MyRecordSerializationSchema() {}
public MyRecordSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
T element, KafkaSinkContext context, Long timestamp) {
try {
return new ProducerRecord<>(
topic,
null, // choosing not to specify the partition
element.ts.toEpochMilli(),
element.getKey(),
objectMapper.writeValueAsBytes(element));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(
"Could not serialize record: " + element, e);
}
}
}
请注意,此示例还设置了时间戳。