从数据流中的 PubSub 读取 AVRO 消息 Python
Read AVRO messages from PubSub in Dataflow Python
我需要从另一个 GCP 项目的 PubSub 主题中读取 AVRO 消息。我之前实现了 Python 数据流管道,它从 PubSub 读取 JSON 消息并写入 BigQuery。但我不熟悉处理 AVRO 消息。我试图查找 AVRO 的 Python 文档,它指向我这个 link https://avro.apache.org/docs/current/gettingstartedpython.html
在这个 link 中有读取文件和写入文件的示例,但我认为这些函数对从 PubSub 读取没有用。我正在使用下面的转换从 PubSub 读取,其中输出是字节串。
"Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
我需要一种方法来读取这些字节(AVRO 格式)
这是您可以使用的示例代码
- 阅读来自Pub/Sub
的消息
from fastavro import parse_schema, schemaless_reader
messages = (p
| beam.io.ReadFromPubSub(
subscription=known_args.input_subscription)
.with_output_types(bytes))
- 使用 Fastavro 包通过 Class 定义
来定义架构和 reader
class AvroReader:
def __init__(self, schema):
self.schema = schema
def deserialize(self, record):
bytes_reader = io.BytesIO(record)
dict_record = schemaless_reader(bytes_reader, self.schema)
return dict_record
- 现在映射字节元素并指定模式
schema = avro.schema.parse(open("avro.avsc", "rb").read())
avroReader = AvroReader(schema)
lines = messages | "decode" >> beam.Map(lambda input: avroReader.deserialize(input))
这些行应该有 PCollection
Avro 的形式。
我需要从另一个 GCP 项目的 PubSub 主题中读取 AVRO 消息。我之前实现了 Python 数据流管道,它从 PubSub 读取 JSON 消息并写入 BigQuery。但我不熟悉处理 AVRO 消息。我试图查找 AVRO 的 Python 文档,它指向我这个 link https://avro.apache.org/docs/current/gettingstartedpython.html
在这个 link 中有读取文件和写入文件的示例,但我认为这些函数对从 PubSub 读取没有用。我正在使用下面的转换从 PubSub 读取,其中输出是字节串。
"Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
我需要一种方法来读取这些字节(AVRO 格式)
这是您可以使用的示例代码
- 阅读来自Pub/Sub 的消息
from fastavro import parse_schema, schemaless_reader
messages = (p
| beam.io.ReadFromPubSub(
subscription=known_args.input_subscription)
.with_output_types(bytes))
- 使用 Fastavro 包通过 Class 定义 来定义架构和 reader
class AvroReader:
def __init__(self, schema):
self.schema = schema
def deserialize(self, record):
bytes_reader = io.BytesIO(record)
dict_record = schemaless_reader(bytes_reader, self.schema)
return dict_record
- 现在映射字节元素并指定模式
schema = avro.schema.parse(open("avro.avsc", "rb").read())
avroReader = AvroReader(schema)
lines = messages | "decode" >> beam.Map(lambda input: avroReader.deserialize(input))
这些行应该有 PCollection
Avro 的形式。