使用 bottledwater-pg,Python 消费者如何读取数据?
With bottledwater-pg, how to read data by a Python consumer?
我在Python写了一个消费者如下:
from kafka import KafkaConsumer
import avro.schema
import avro.io
import io
# To consume messages
consumer = KafkaConsumer('test',
group_id='',
bootstrap_servers=['kafka:9092'])
schema = """
{
"namespace":"com.martinkl.bottledwater.dbschema.public",
"type":"record",
"name":"test",
"fields":[
{"name":"id","type":["int", "null"]},
{"name":"value","type":["string", "null"]}
]
}
"""
schema = avro.schema.parse(schema)
for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
hello = reader.read(decoder)
print hello
一切似乎都很好,但是当我 运行 向 Postgres 插入数据时:
postgres=# insert into test (value) values('hello world!');
消费者输出为空:
$ python consumer_bottledwater-pg.py
{u'id': 0, u'value': u''}
请帮我解决一下。提前谢谢你。
Bottled Water 发布到 Kafka 的 Avro-encoded 消息以 5 字节 header 为前缀。第一个字节始终为零(保留供将来使用),接下来的 4 个字节是一个 big-endian 32 位数字,指示模式的 ID。
在您的示例中,您在 Python 应用程序中有 hard-coded 架构,但一旦上游数据库架构发生变化,该方法就会失效。这就是为什么最好将 Bottled Water 与 schema registry. When you read a message from Kafka you first decode the header to find the schema ID, and if you haven't seen that schema ID before, you query the registry 结合使用来查找架构。然后您可以使用该模式解码消息的其余部分。模式可以缓存在消费者中,因为注册表保证特定 ID 的模式是不可变的。
您还可以查看架构注册表随附的 KafkaAvroDeserializer 代码,了解如何在 Java 中完成解码。您可以在 Python.
中执行相同的操作
非常感谢@Martin Kleppmann。我按照你的指导做了。它工作正常。
value = bytearray(msg.value)
bytes_reader = io.BytesIO(value[5:])
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
hello = reader.read(decoder)
print hello
我在Python写了一个消费者如下:
from kafka import KafkaConsumer
import avro.schema
import avro.io
import io
# To consume messages
consumer = KafkaConsumer('test',
group_id='',
bootstrap_servers=['kafka:9092'])
schema = """
{
"namespace":"com.martinkl.bottledwater.dbschema.public",
"type":"record",
"name":"test",
"fields":[
{"name":"id","type":["int", "null"]},
{"name":"value","type":["string", "null"]}
]
}
"""
schema = avro.schema.parse(schema)
for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
hello = reader.read(decoder)
print hello
一切似乎都很好,但是当我 运行 向 Postgres 插入数据时:
postgres=# insert into test (value) values('hello world!');
消费者输出为空:
$ python consumer_bottledwater-pg.py
{u'id': 0, u'value': u''}
请帮我解决一下。提前谢谢你。
Bottled Water 发布到 Kafka 的 Avro-encoded 消息以 5 字节 header 为前缀。第一个字节始终为零(保留供将来使用),接下来的 4 个字节是一个 big-endian 32 位数字,指示模式的 ID。
在您的示例中,您在 Python 应用程序中有 hard-coded 架构,但一旦上游数据库架构发生变化,该方法就会失效。这就是为什么最好将 Bottled Water 与 schema registry. When you read a message from Kafka you first decode the header to find the schema ID, and if you haven't seen that schema ID before, you query the registry 结合使用来查找架构。然后您可以使用该模式解码消息的其余部分。模式可以缓存在消费者中,因为注册表保证特定 ID 的模式是不可变的。
您还可以查看架构注册表随附的 KafkaAvroDeserializer 代码,了解如何在 Java 中完成解码。您可以在 Python.
中执行相同的操作非常感谢@Martin Kleppmann。我按照你的指导做了。它工作正常。
value = bytearray(msg.value)
bytes_reader = io.BytesIO(value[5:])
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
hello = reader.read(decoder)
print hello