使用 python 消费者将 avro 序列化消息转换为 json
Convert avro serialized messages into json using python consumer
from kafka import KafkaConsumer
import json
import io
if __name__ == '__main__':
# consumer = KafkaConsumer(
# 'ldt_lm_mytable',
# bootstrap_servers = 'localhost:9092',
# auto_offset_reset = 'earliest',
# group_id = 'consumer_group_a')
KAFKA_HOSTS = ['kafka:9092']
KAFKA_VERSION = (0, 10)
topic = "ldt_lm_mytable"
consumer = KafkaConsumer(topic, bootstrap_servers=KAFKA_HOSTS, api_version=KAFKA_VERSION)
for msg in consumer:
print('Lead = {}'.format(json.loads(msg.value)))
没有打印。在将数据生成主题 (Debezium) 时,我正在使用 avro 转换器。我试过一些来自互联网的转换器。但这些都不起作用。其中之一是这样的
bytes_reader = io.BytesIO(consumer)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
decoded_data = reader.read(decoder)
在这个转换器中,我将从哪里获得 'schema' 变量的值?如何加载 'avro' 包? 'io.BytesIO' 给我一个错误
Traceback (most recent call last):
File "consumer.py", line 19, in <module>
bytes_reader = io.BytesIO(consumer)
TypeError: a bytes-like object is required, not 'KafkaConsumer'
提前致谢!
假设 Debezium 连接器在 Kafka Connect 中使用标准 io.confluent.connect.avro.AvroConverter
,那么您需要使用 Confluent Schema Registry 附带的 Avro 反序列化器。
这是一个消费者示例 from here:
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
c = AvroConsumer({
'bootstrap.servers': 'mybroker,mybroker2',
'group.id': 'groupid',
'schema.registry.url': 'http://127.0.0.1:8081'})
c.subscribe(['my_topic'])
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()
from kafka import KafkaConsumer
import json
import io
if __name__ == '__main__':
# consumer = KafkaConsumer(
# 'ldt_lm_mytable',
# bootstrap_servers = 'localhost:9092',
# auto_offset_reset = 'earliest',
# group_id = 'consumer_group_a')
KAFKA_HOSTS = ['kafka:9092']
KAFKA_VERSION = (0, 10)
topic = "ldt_lm_mytable"
consumer = KafkaConsumer(topic, bootstrap_servers=KAFKA_HOSTS, api_version=KAFKA_VERSION)
for msg in consumer:
print('Lead = {}'.format(json.loads(msg.value)))
没有打印。在将数据生成主题 (Debezium) 时,我正在使用 avro 转换器。我试过一些来自互联网的转换器。但这些都不起作用。其中之一是这样的
bytes_reader = io.BytesIO(consumer)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
decoded_data = reader.read(decoder)
在这个转换器中,我将从哪里获得 'schema' 变量的值?如何加载 'avro' 包? 'io.BytesIO' 给我一个错误
Traceback (most recent call last):
File "consumer.py", line 19, in <module>
bytes_reader = io.BytesIO(consumer)
TypeError: a bytes-like object is required, not 'KafkaConsumer'
提前致谢!
假设 Debezium 连接器在 Kafka Connect 中使用标准 io.confluent.connect.avro.AvroConverter
,那么您需要使用 Confluent Schema Registry 附带的 Avro 反序列化器。
这是一个消费者示例 from here:
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
c = AvroConsumer({
'bootstrap.servers': 'mybroker,mybroker2',
'group.id': 'groupid',
'schema.registry.url': 'http://127.0.0.1:8081'})
c.subscribe(['my_topic'])
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()