Python 流数据流 "WriteToPubSub" 行为
Python Streaming Dataflow "WriteToPubSub" behaviour
我正在尝试使用流数据流从 PubSub 读取并写入另一个 PubSub。我使用的是 python 3.7.3 版本。管道看起来像这样,
lines = (pipe | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Transformation" >> beam.ParDo(PubSubToDict())
| "Write to PubSub" >> beam.io.WriteToPubSub(topic=OUTPUT, with_attributes=False)
)
"Transformation" 这一步是我需要进行一些自定义转换的地方。我确保此转换的输出是字节。像这样,
class PubSubToDict(beam.DoFn):
def process(self, element):
"""pubsub input is a byte string"""
data = element.decode('utf-8')
"""do some custom transform here"""
data = data.encode('utf-8')
return data
现在发布测试消息时,出现这样的错误,
ERROR: Data being published to Pub/Sub must be sent as a bytestring. [while running 'Write to PubSub']
我设法通过像这样返回一个数组来解决这个问题,
return [data]
但我不知道这样做的原因。所以我正在寻找对此的解释。
此致,
普拉萨德
之所以成功,是因为 ParDo 让管道步骤 return 单个输入元素的多个输出元素,因此它期望一个可迭代对象被 returned。
你也可以yield data
我正在尝试使用流数据流从 PubSub 读取并写入另一个 PubSub。我使用的是 python 3.7.3 版本。管道看起来像这样,
lines = (pipe | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Transformation" >> beam.ParDo(PubSubToDict())
| "Write to PubSub" >> beam.io.WriteToPubSub(topic=OUTPUT, with_attributes=False)
)
"Transformation" 这一步是我需要进行一些自定义转换的地方。我确保此转换的输出是字节。像这样,
class PubSubToDict(beam.DoFn):
def process(self, element):
"""pubsub input is a byte string"""
data = element.decode('utf-8')
"""do some custom transform here"""
data = data.encode('utf-8')
return data
现在发布测试消息时,出现这样的错误,
ERROR: Data being published to Pub/Sub must be sent as a bytestring. [while running 'Write to PubSub']
我设法通过像这样返回一个数组来解决这个问题,
return [data]
但我不知道这样做的原因。所以我正在寻找对此的解释。
此致, 普拉萨德
之所以成功,是因为 ParDo 让管道步骤 return 单个输入元素的多个输出元素,因此它期望一个可迭代对象被 returned。
你也可以yield data