Avro Producer 在没有密钥模式的情况下发送密钥
Avro Producer sending key without key schema
我在 Python 2.7 中使用 Avro Producer。我需要发送一条带有键和值的消息,
值在主题中有 Avro-Schema,但键没有 Avro-Schema(我无法为键添加 Schema - 遗留原因)。
这是我的代码:
def main():
kafkaBrokers = os.environ.get('KAFKA_BROKERS')
schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
topic = os.environ.get('KAFKA_TOPIC')
subject = '${}-value'.format(topic)
sr = CachedSchemaRegistryClient(schemaRegistry)
schema = sr.get_latest_schema(subject).schema
value_schema = avro.loads(str(schema))
url = 'test.com'
value = {'url': u'test.com', 'priority': 10}
avroProducer = AvroProducer({
'bootstrap.servers': kafkaBrokers,
'schema.registry.url': schemaRegistry
}, default_value_schema=value_schema)
key = 1638895406382020875
avroProducer.produce(topic=topic, value=value, key=key)
avroProducer.flush()
我收到以下错误:
raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key
如果我从生成函数中删除密钥:
avroProducer.produce(topic=topic, value=value)
有效。
如何在没有架构的情况下发送密钥?
AvroProducer
假定键和值都使用模式注册表进行编码,在键和值的有效负载前添加一个魔术字节和模式 ID。
如果您想对密钥使用自定义序列化,可以使用 Producer
而不是 AvroProducer
。但是您有责任序列化键(使用您想要的任何格式)和值(这意味着对值进行编码并在前面加上魔术字节和模式 ID)。要了解这是如何完成的,您可以查看 AvroProducer
代码。
但这也意味着您必须自己编写 AvroConsumer
并且无法使用 kafka-avro-console-consumer
。
您需要使用常规 Producer 并自行执行序列化函数
from confluent_kafka import avro
from confluent_kafka.avro import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer
avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema # extract function definition
value_schema = avro.load('avro_schemas/value.avsc') # TODO: Create avro_schemas folder
p = Producer({'bootstrap.servers': bootstrap_servers})
value_payload = serialize_avro(topic, value_schema, value, is_key=False)
p.produce(topic, key=key, value=value_payload, callback=delivery_report)
我在 Python 2.7 中使用 Avro Producer。我需要发送一条带有键和值的消息, 值在主题中有 Avro-Schema,但键没有 Avro-Schema(我无法为键添加 Schema - 遗留原因)。
这是我的代码:
def main():
kafkaBrokers = os.environ.get('KAFKA_BROKERS')
schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
topic = os.environ.get('KAFKA_TOPIC')
subject = '${}-value'.format(topic)
sr = CachedSchemaRegistryClient(schemaRegistry)
schema = sr.get_latest_schema(subject).schema
value_schema = avro.loads(str(schema))
url = 'test.com'
value = {'url': u'test.com', 'priority': 10}
avroProducer = AvroProducer({
'bootstrap.servers': kafkaBrokers,
'schema.registry.url': schemaRegistry
}, default_value_schema=value_schema)
key = 1638895406382020875
avroProducer.produce(topic=topic, value=value, key=key)
avroProducer.flush()
我收到以下错误:
raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key
如果我从生成函数中删除密钥:
avroProducer.produce(topic=topic, value=value)
有效。
如何在没有架构的情况下发送密钥?
AvroProducer
假定键和值都使用模式注册表进行编码,在键和值的有效负载前添加一个魔术字节和模式 ID。
如果您想对密钥使用自定义序列化,可以使用 Producer
而不是 AvroProducer
。但是您有责任序列化键(使用您想要的任何格式)和值(这意味着对值进行编码并在前面加上魔术字节和模式 ID)。要了解这是如何完成的,您可以查看 AvroProducer
代码。
但这也意味着您必须自己编写 AvroConsumer
并且无法使用 kafka-avro-console-consumer
。
您需要使用常规 Producer 并自行执行序列化函数
from confluent_kafka import avro
from confluent_kafka.avro import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer
avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema # extract function definition
value_schema = avro.load('avro_schemas/value.avsc') # TODO: Create avro_schemas folder
p = Producer({'bootstrap.servers': bootstrap_servers})
value_payload = serialize_avro(topic, value_schema, value, is_key=False)
p.produce(topic, key=key, value=value_payload, callback=delivery_report)