如何在没有架构的情况下使用 python 生成 avro 类型?
How can I produce without schema as avro type with python?
我使用下面的代码向 kafka 发送消息。有效。
但是我想发送没有模式的消息,因为我有关于 kafka 主题的模式。我先注册。我不想每次都发送架构。
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"type":"record",
"name":"myrecord",
"fields":[
{
"name":"id",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"product",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"quantity",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"price",
"type":[
"null",
"int"
],
"default":null
}
]
}
"""
key_schema_str = """
{
"type":"record",
"name":"key_schema",
"fields":[
{
"name":"id",
"type":"int"
}
]
}
"""
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
#value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
key = {"id": 1}
avroProducer = AvroProducer({
'bootstrap.servers': '10.0.0.0:9092',
'on_delivery': delivery_report,
'schema.registry.url': 'http://10.0.0.0:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='orders', key=key)
avroProducer.flush()t
提前致谢
I do not want to send schema everytime.
Avro 需要 架构。句号
I have schema on kafka topic
Kafka 主题没有模式。我假设你的意思是你在注册表中有一个架构?然后你必须在你的生产者中使用它之前获取它
from confluent_kafka.avro import CachedSchemaRegistryClient
sr_client = CachedSchemaRegistryClient({'url': "http://10.0.0.0:8081"})
然后用客户端做一个get_schema
调用
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
sr = CachedSchemaRegistryClient({"url": "http://localhost:8081"})
value_schema = sr.get_latest_schema("orders-value")[1]
key_schema= sr.get_latest_schema("orders-key")[1]
# pip install python-schema-registry-client
from schema_registry.client import SchemaRegistryClient
sr = SchemaRegistryClient('localhost:8081')
value_schema = sr.get_schema('orders-value', version='latest').schema
key_schema = sr.get_schema('orders-key', version='latest').schema
最后:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
key = {"id": 1}
avroProducer = AvroProducer({
'bootstrap.servers': '10.0.0.0:9092',
'on_delivery': delivery_report,
'schema.registry.url': 'http://10.0.0.0:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='orders', key=key, value=value)
avroProducer.flush()
我使用下面的代码向 kafka 发送消息。有效。
但是我想发送没有模式的消息,因为我有关于 kafka 主题的模式。我先注册。我不想每次都发送架构。
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"type":"record",
"name":"myrecord",
"fields":[
{
"name":"id",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"product",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"quantity",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"price",
"type":[
"null",
"int"
],
"default":null
}
]
}
"""
key_schema_str = """
{
"type":"record",
"name":"key_schema",
"fields":[
{
"name":"id",
"type":"int"
}
]
}
"""
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
#value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
key = {"id": 1}
avroProducer = AvroProducer({
'bootstrap.servers': '10.0.0.0:9092',
'on_delivery': delivery_report,
'schema.registry.url': 'http://10.0.0.0:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='orders', key=key)
avroProducer.flush()t
提前致谢
I do not want to send schema everytime.
Avro 需要 架构。句号
I have schema on kafka topic
Kafka 主题没有模式。我假设你的意思是你在注册表中有一个架构?然后你必须在你的生产者中使用它之前获取它
from confluent_kafka.avro import CachedSchemaRegistryClient
sr_client = CachedSchemaRegistryClient({'url': "http://10.0.0.0:8081"})
然后用客户端做一个get_schema
调用
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
sr = CachedSchemaRegistryClient({"url": "http://localhost:8081"})
value_schema = sr.get_latest_schema("orders-value")[1]
key_schema= sr.get_latest_schema("orders-key")[1]
# pip install python-schema-registry-client
from schema_registry.client import SchemaRegistryClient
sr = SchemaRegistryClient('localhost:8081')
value_schema = sr.get_schema('orders-value', version='latest').schema
key_schema = sr.get_schema('orders-key', version='latest').schema
最后:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
key = {"id": 1}
avroProducer = AvroProducer({
'bootstrap.servers': '10.0.0.0:9092',
'on_delivery': delivery_report,
'schema.registry.url': 'http://10.0.0.0:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='orders', key=key, value=value)
avroProducer.flush()