如何以编程方式更新 Confluent Schema Registry 中的主题模式和兼容性

How to programmatically update subject schema and compatibility in Confluent Schema Registry

我已经在模式注册表中注册了一个模式,我可以像这样使用 register() 来完成,

from schema_registry.client import SchemaRegistryClient, schema

subject_name = "new-schema"
schema_url = "https://{{ schemaRegistry }}:8081" 
sr = SchemaRegistryClient(schema_url)

schema = schema.AvroSchema({
    "namespace": "example.avro",
    "type": "record",
    "name": "user",
    "fields": [
        {"name": "fname", "type": "string"},
        {"name": "favorite_number",  "type": "int"}
    ]
})

my_schema = sr.register(subject_name, schema)

现在我需要用新字段更新同一主题,因此我将获得新的模式 ID,并且 version = 2

updated_schema = schema.AvroSchema({
    "namespace": "example.avro",
    "type": "record",
    "name": "user",
    "fields": [
        {"name": "fname", "type": "string"},
        {"name": "favorite_number",  "type": "int"},
        {"name": "favorite_food",  "type": "string"}
    ]
})

我尝试使用 sr.register(subject_name, updated_schema),它会针对同一主题抛出错误:

AttributeError: 'ClientError' object has no attribute '_get_object_id'
ClientError: Incompatible Avro schema

是的,此功能是注册新架构而不是更新。我没有得到任何更新功能,我不知道我该怎么做。那么我该如何更新架构?任何帮助将不胜感激。

在主题中注册新模式时,模式注册表会强制执行某些兼容性规则。因此,您需要确保主题的兼容模式与您正在寻找的模式演变相匹配。


使用confluent-kafka-python

from confluent_kafka.schema_registry import SchemaRegistryClient


sr = SchemaRegistryClient("https://schema-registry-host:8081")

# Options are: 
#   - NONE, FULL, BACKWARD, FORWARD, 
#   - BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
sr.set_compatibility("yourSubjectName", "NONE")

使用python-schema-registry-client

from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient("https://schema-registry-host:8081")
sr.update_compatibility(level="NONE", subject="yourSubjectName")

有关兼容性类型的完整列表,请参阅 Confluent Documentation