在 Apache Beam / Dataflow Python 流中写入文本文件
Writing to text files in Apache Beam / Dataflow Python streaming
我有一个非常基本的 Python 数据流作业,它从 Pub/Sub 读取一些数据,应用 FixedWindow 并写入 Google 云存储。
transformed = ...
transformed | beam.io.WriteToText(known_args.output)
输出被写入--output中指定的位置,但只是临时阶段,即
gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...
文件永远不会被放置到使用分片模板正确命名的位置。
在本地和 DataFlow runner 上测试。
进一步测试时,我注意到 streaming_wordcount 示例也有同样的问题,但是标准的 wordcount 示例写得很好。也许问题在于窗口或从 pubsub 读取?
WriteToText 似乎与 PubSub 的流媒体源不兼容。可能有解决方法,或者 Java 版本可能兼容,但我选择完全使用不同的解决方案。
Python 流式管道执行在实验上可用(有一些限制)。
不支持的功能适用于所有跑步者。
状态和计时器 APIs,
自定义来源 API,
可拆分的 DoFn API,
处理迟到的数据,
用户自定义WindowFn.
此外,DataflowRunner 目前不支持 Python 流式执行的以下 Cloud Dataflow 特定功能。
流式自动缩放
更新现有管道
云数据流模板
一些监控功能,例如毫秒计数器、显示数据、指标和转换的元素计数。但是,支持源的日志记录、水印和元素计数。
https://beam.apache.org/documentation/sdks/python-streaming/
由于您正在使用 FixedWindowFn 并且管道能够将数据写入 tmp 位置,请重新检查输出位置 --output gs://<your-gcs-bucket>/<you-gcs-folder>/<your-gcs-output-filename>
Python SDK 中的 WriteToText
转换不支持流。
相反,您可以考虑 apache_beam.io.fileio
中的转换。在这种情况下,您可以这样写(假设 10 分钟 windows):
my_pcollection = (p | ReadFromPubSub(....)
| WindowInto(FixedWindows(10*60))
| fileio.WriteToFiles(path=known_args.output))
这足以为每个 window 写出单独的文件,并随着流的推进继续这样做。
您会看到这样的文件(假设输出为 gs://mybucket/
)。文件将在 windows 被触发时打印:
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0000-00002
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0001-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0000-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0001-00002
...
默认情况下,文件有 $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix
个名称 - 默认情况下前缀为 output
,但您可以为文件命名传递更复杂的函数。
如果您想自定义文件的写入方式(例如文件的命名、数据的格式等),您可以查看 WriteToFiles
中的额外参数。
您可以看到在 Beam 测试中使用的转换示例 here,参数更复杂 - 但听起来默认行为对您来说应该足够了。
我有一个非常基本的 Python 数据流作业,它从 Pub/Sub 读取一些数据,应用 FixedWindow 并写入 Google 云存储。
transformed = ...
transformed | beam.io.WriteToText(known_args.output)
输出被写入--output中指定的位置,但只是临时阶段,即
gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...
文件永远不会被放置到使用分片模板正确命名的位置。
在本地和 DataFlow runner 上测试。
进一步测试时,我注意到 streaming_wordcount 示例也有同样的问题,但是标准的 wordcount 示例写得很好。也许问题在于窗口或从 pubsub 读取?
WriteToText 似乎与 PubSub 的流媒体源不兼容。可能有解决方法,或者 Java 版本可能兼容,但我选择完全使用不同的解决方案。
Python 流式管道执行在实验上可用(有一些限制)。
不支持的功能适用于所有跑步者。 状态和计时器 APIs, 自定义来源 API, 可拆分的 DoFn API, 处理迟到的数据, 用户自定义WindowFn.
此外,DataflowRunner 目前不支持 Python 流式执行的以下 Cloud Dataflow 特定功能。
流式自动缩放 更新现有管道 云数据流模板 一些监控功能,例如毫秒计数器、显示数据、指标和转换的元素计数。但是,支持源的日志记录、水印和元素计数。
https://beam.apache.org/documentation/sdks/python-streaming/
由于您正在使用 FixedWindowFn 并且管道能够将数据写入 tmp 位置,请重新检查输出位置 --output gs://<your-gcs-bucket>/<you-gcs-folder>/<your-gcs-output-filename>
Python SDK 中的 WriteToText
转换不支持流。
相反,您可以考虑 apache_beam.io.fileio
中的转换。在这种情况下,您可以这样写(假设 10 分钟 windows):
my_pcollection = (p | ReadFromPubSub(....)
| WindowInto(FixedWindows(10*60))
| fileio.WriteToFiles(path=known_args.output))
这足以为每个 window 写出单独的文件,并随着流的推进继续这样做。
您会看到这样的文件(假设输出为 gs://mybucket/
)。文件将在 windows 被触发时打印:
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0000-00002
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0001-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0000-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0001-00002
...
默认情况下,文件有 $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix
个名称 - 默认情况下前缀为 output
,但您可以为文件命名传递更复杂的函数。
如果您想自定义文件的写入方式(例如文件的命名、数据的格式等),您可以查看 WriteToFiles
中的额外参数。
您可以看到在 Beam 测试中使用的转换示例 here,参数更复杂 - 但听起来默认行为对您来说应该足够了。