如何在 Apache Flink 1.14 中将消息密钥添加到 KafkaSink

How can I add message key to KafkaSink in Apache Flink 1.14

如标题所述,我需要在 KafkaSink 中设置自定义 message key。我在 Apache Flink 1.14 文档中找不到有关如何实现此目的的任何说明。 目前我正在正确设置 KafkaSink 并且 data payload 正确写入 topic,但 keynull。 有什么建议么?提前致谢

您应该实现一个 KafkaRecordSerializationSchema,它在 serialize 方法返回的 ProducerRecord 上设置密钥。

您将像这样创建接收器 more-or-less:

KafkaSink<UsageRecord> sink =
    KafkaSink.<UsageRecord>builder()
             .setBootstrapServers(brokers)
             .setKafkaProducerConfig(kafkaProps)
             .setRecordSerializer(new MyRecordSerializationSchema(topic))
             .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
             .setTransactionalIdPrefix("my-record-producer")
             .build();

序列化程序将是这样的:

public class MyRecordSerializationSchema implements
        KafkaRecordSerializationSchema<T> {

    private static final long serialVersionUID = 1L;

    private String topic;
    private static final ObjectMapper objectMapper =
        JsonMapper.builder()
            .build()
            .registerModule(new JavaTimeModule())
            .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

    public MyRecordSerializationSchema() {}

    public MyRecordSerializationSchema(String topic) {
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(
            T element, KafkaSinkContext context, Long timestamp) {

        try {
            return new ProducerRecord<>(
                    topic,
                    null, // choosing not to specify the partition
                    element.ts.toEpochMilli(),
                    element.getKey(),
                    objectMapper.writeValueAsBytes(element));
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(
                "Could not serialize record: " + element, e);
        }
    }
}

请注意,此示例还设置了时间戳。

FWIW,此示例来自 https://github.com/alpinegizmo/flink-mobile-data-usage/blob/main/src/main/java/com/ververica/flink/example/datausage/records/UsageRecordSerializationSchema.java