Kafka 消费者:如何读取 Python 中的特定 Avro 字段?
Kafka Consumer: How to read specific Avro field in Python?
在下面的消费者代码片段中,我能够接收到发送的数据。我如何从整个数据中访问特定值以进行处理。
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import (SerializerError,
KeySerializerError,
ValueSerializerError)
***
***
***
c.subscribe(['Topic'])
while True:
try:
msg = c.poll(10)
print(msg)
谢谢
我看到你正在导入 AvroConsumer,所以你会
c.value()['field']
实际上有两种方法可以实现此目的:
msg.value()['myFieldName']
或
msg.value().get('myFieldName')
例如,
c = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'localhost:8081',
'group.id': 'test-group'
})
c.subscribe(['Topic'])
while True:
try:
msg = c.poll(10)
if msg:
print(f"field1 Value: {msg.value()['field1']}")
print(f"field2 Value: {msg.value().get('field2')}")
else:
pass
except SerializerError as e:
print(f"Message deserialization failed for message {msg}:\n{e}")
在下面的消费者代码片段中,我能够接收到发送的数据。我如何从整个数据中访问特定值以进行处理。
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import (SerializerError,
KeySerializerError,
ValueSerializerError)
***
***
***
c.subscribe(['Topic'])
while True:
try:
msg = c.poll(10)
print(msg)
谢谢
我看到你正在导入 AvroConsumer,所以你会
c.value()['field']
实际上有两种方法可以实现此目的:
msg.value()['myFieldName']
或
msg.value().get('myFieldName')
例如,
c = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'localhost:8081',
'group.id': 'test-group'
})
c.subscribe(['Topic'])
while True:
try:
msg = c.poll(10)
if msg:
print(f"field1 Value: {msg.value()['field1']}")
print(f"field2 Value: {msg.value().get('field2')}")
else:
pass
except SerializerError as e:
print(f"Message deserialization failed for message {msg}:\n{e}")