如何使用 Python 在 Kafka Schema Registry 中以编程方式注册 Avro Schema
How to programatically register Avro Schema in Kafka Schema Registry using Python
我使用 python.
将数据和模式放入 kafka 和模式注册表
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"type":"record",
"name":"myrecord",
"fields":[
{
"name":"ID",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"PRODUCT",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"QUANTITY",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"PRICE",
"type":[
"null",
"int"
],
"default":null
}
]
}
"""
key_schema_str = """
{
"type":"record",
"name":"key_schema",
"fields":[
{
"name":"ID",
"type":"int"
}
]
}
"""
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"ID": 199 , "PRODUCT":"Yagiz Gulbahar", "QUANTITY":1453,"PRICE":61}
key = {"ID": 199}
avroProducer = AvroProducer({
'bootstrap.servers': '10.0.0.0:9092',
'on_delivery': delivery_report,
'schema.registry.url': 'http://10.0.0.0:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='ersin_test_2', key=key, value=value)
avroProducer.flush()
我可以只放模式而不放数据吗?
提前致谢
您还可以使用 confluent-kafka-python
or python-schema-registry-client
以编程方式注册模式。
from confluent_kafka.avro import SchemaRegistryClient, Schema
schema_str = """
{
"type": "record",
"name": "user",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""
avro_schema = Schema(schema_str, 'AVRO')
sr = SchemaRegistryClient("http://schema-registry-host:8081")
_schema_id = client.register_schema("yourSubjectName", avro_schema)
使用python-schema-registry-client
from schema_registry.client import SchemaRegistryClient, schema
schema_ = schema.AvroSchema({
"type": "record",
"name": "user",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "age", "type": "int"}
]
})
my_schema = sr.register("yourSubjectName", schema_)
我使用 python.
将数据和模式放入 kafka 和模式注册表from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"type":"record",
"name":"myrecord",
"fields":[
{
"name":"ID",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"PRODUCT",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"QUANTITY",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"PRICE",
"type":[
"null",
"int"
],
"default":null
}
]
}
"""
key_schema_str = """
{
"type":"record",
"name":"key_schema",
"fields":[
{
"name":"ID",
"type":"int"
}
]
}
"""
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"ID": 199 , "PRODUCT":"Yagiz Gulbahar", "QUANTITY":1453,"PRICE":61}
key = {"ID": 199}
avroProducer = AvroProducer({
'bootstrap.servers': '10.0.0.0:9092',
'on_delivery': delivery_report,
'schema.registry.url': 'http://10.0.0.0:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='ersin_test_2', key=key, value=value)
avroProducer.flush()
我可以只放模式而不放数据吗?
提前致谢
您还可以使用 confluent-kafka-python
or python-schema-registry-client
以编程方式注册模式。
from confluent_kafka.avro import SchemaRegistryClient, Schema
schema_str = """
{
"type": "record",
"name": "user",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""
avro_schema = Schema(schema_str, 'AVRO')
sr = SchemaRegistryClient("http://schema-registry-host:8081")
_schema_id = client.register_schema("yourSubjectName", avro_schema)
使用python-schema-registry-client
from schema_registry.client import SchemaRegistryClient, schema
schema_ = schema.AvroSchema({
"type": "record",
"name": "user",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "age", "type": "int"}
]
})
my_schema = sr.register("yourSubjectName", schema_)