使用 python 消费者将 avro 序列化消息转换为 json

Convert avro serialized messages into json using python consumer

from kafka import KafkaConsumer
import json
import io

if __name__ == '__main__':

  # consumer = KafkaConsumer(
  #     'ldt_lm_mytable',
  #     bootstrap_servers = 'localhost:9092',
  #     auto_offset_reset = 'earliest',
  #     group_id = 'consumer_group_a')

  KAFKA_HOSTS = ['kafka:9092']
  KAFKA_VERSION = (0, 10)
  topic = "ldt_lm_mytable"

  consumer = KafkaConsumer(topic, bootstrap_servers=KAFKA_HOSTS, api_version=KAFKA_VERSION)

  for msg in consumer:
    print('Lead = {}'.format(json.loads(msg.value)))

没有打印。在将数据生成主题 (Debezium) 时,我正在使用 avro 转换器。我试过一些来自互联网的转换器。但这些都不起作用。其中之一是这样的

bytes_reader = io.BytesIO(consumer)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
decoded_data = reader.read(decoder)

在这个转换器中,我将从哪里获得 'schema' 变量的值?如何加载 'avro' 包? 'io.BytesIO' 给我一个错误

Traceback (most recent call last):
File "consumer.py", line 19, in <module>
bytes_reader = io.BytesIO(consumer)
TypeError: a bytes-like object is required, not 'KafkaConsumer'

提前致谢!

假设 Debezium 连接器在 Kafka Connect 中使用标准 io.confluent.connect.avro.AvroConverter,那么您需要使用 Confluent Schema Registry 附带的 Avro 反序列化器。

这是一个消费者示例 from here:

from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


c = AvroConsumer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'group.id': 'groupid',
    'schema.registry.url': 'http://127.0.0.1:8081'})

c.subscribe(['my_topic'])

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()