Kafka 消费者:如何读取 Python 中的特定 Avro 字段?

Kafka Consumer: How to read specific Avro field in Python?

在下面的消费者代码片段中,我能够接收到发送的数据。我如何从整个数据中访问特定值以进行处理。

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import (SerializerError,
                                             KeySerializerError,
                                             ValueSerializerError)

***
***
***

c.subscribe(['Topic'])

while True:
    try:
        msg = c.poll(10)
        print(msg)

谢谢

我看到你正在导入 AvroConsumer,所以你会

c.value()['field'] 

实际上有两种方法可以实现此目的:

msg.value()['myFieldName']

msg.value().get('myFieldName')

例如,

c = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'localhost:8081',
    'group.id': 'test-group'
})


c.subscribe(['Topic'])

while True:
    try:
        msg = c.poll(10)

        if msg:
            print(f"field1 Value: {msg.value()['field1']}")
            print(f"field2 Value: {msg.value().get('field2')}")

        else: 
            pass
    except SerializerError as e:
        print(f"Message deserialization failed for message {msg}:\n{e}")