Flink statefun 和 confluent schema registry 兼容性

Flink statefun and confluent schema registry compatibility

我正在尝试从 flink statefun 出口到 confluent kafka。在 confluent git repo 为了进行模式检查并将数据放入 kafka 主题,我们需要做的就是使用带有 avro 对象的 kafka 客户端 ProducerRecord 对象。

但在 statefun 中,我们需要为 kafka 出口覆盖“ProducerRecord serialize”方法。这会导致以下错误。

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "bytes"

Schema registery 和 statefun kafka egress 似乎不兼容。有什么解决方法吗?

此版本的有状态函数不直接支持架构注册表, 但几乎没有解决方法:

  1. 您自己从 KafkaEgressSerializer class 连接到架构注册表。在您的链接示例中,需要发生 here.
  2. 提供您自己的基于 on 的 FlinkKafkaProducer 实例(请参阅 AvroDeserializationSchema)
  3. 在有状态函数之外管理模式,但将 Avro 记录序列化为字节。确保从传递给 KafkaProducer
  4. 的属性中删除架构注册表

可以将 Confluent Schema Registry 与 Statefun Egress 一起使用。

为此,您首先要在架构注册表中手动注册您的架构,然后提供 KafkaEgressSerializerKafkaAvroSerializer 实例序列化的 byte[]

下面的代码是它的要点,符合 Igal 解决方法建议中的第一个代码:

public class SpecificRecordFromAvroSchemaSerializer implements KafkaEgressSerializer<SpecificRecordGeneratedFromAvroSchema> {

    private static String KAFKA_TOPIC = "kafka_topic";

    private static CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(
        "http://schema-registry:8081",
        1_000
    );
    private static KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient);

    static {
        try {
            schemaRegistryClient.register(
                KAFKA_TOPIC + "-value", // assuming subject name strategy is TopicNameStrategy (default)
                SpecificRecordGeneratedFromAvroSchema.getClassSchema()
            );
        } catch (IOException e) {
            e.printStackTrace();
        } catch (RestClientException e) {
            e.printStackTrace();
        }
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(SpecificRecordGeneratedFromAvroSchema specificRecordGeneratedFromAvroSchema) {
        byte[] valueData = kafkaAvroSerializer.serialize(
            KAFKA_TOPIC,
            specificRecordGeneratedFromAvroSchema
        );

        return new ProducerRecord<>(
            KAFKA_TOPIC,
            String.valueOf(System.currentTimeMillis()).getBytes(),
            valueData
        );
    }

}