Google Dataflow 花费数小时估算输入大小
Google Dataflow spending hours estimating input size
我是 Google Dataflow 的新手,我发现该服务在实际处理数据之前会花费几个小时来估算输入文件的大小,并且通常会在失败之前对大型输入集合进行多次重新计数。我正在使用 Apache Beam 2.9 和 io.ReadFromText 方法。
日志以开始估计输入文件大小的注释开头,并继续记录每 10k 个文件更新一次。
有没有办法跳过这一步或显着加快计数的速度?
Python ReadFromText
来源基于 FileBasedSource
。如果您查看它的代码,您会发现 its estimate_size method is inefficient for a very large set of files.
正如我们在评论中讨论的那样,您可以通过手动划分文件范围来改善此瓶颈。例如,如果您的文件是 gs://my_bucket/file001, gs://my_bucket/file002, ... gs://my_bucket/file999
,您应该可以添加 10 个源,如下所示:
p = Pipeline()
file_lines = (
[p | ReadFromText('gs://my_bucket/file%s*' % i) for i in range(10)]
| beam.Flatten())
这应该有助于您的管道针对这种情况进行扩展。
至于永久性解决方案...我想可以尝试对源代码本身提出改进建议,以便将来的版本可能具有更好的性能。
我们还计划实施基于 Java 的 FileIO
转换的转换。这也可能有帮助。但它在 horizon.
中并不那么近
我是 Google Dataflow 的新手,我发现该服务在实际处理数据之前会花费几个小时来估算输入文件的大小,并且通常会在失败之前对大型输入集合进行多次重新计数。我正在使用 Apache Beam 2.9 和 io.ReadFromText 方法。
日志以开始估计输入文件大小的注释开头,并继续记录每 10k 个文件更新一次。
有没有办法跳过这一步或显着加快计数的速度?
Python ReadFromText
来源基于 FileBasedSource
。如果您查看它的代码,您会发现 its estimate_size method is inefficient for a very large set of files.
正如我们在评论中讨论的那样,您可以通过手动划分文件范围来改善此瓶颈。例如,如果您的文件是 gs://my_bucket/file001, gs://my_bucket/file002, ... gs://my_bucket/file999
,您应该可以添加 10 个源,如下所示:
p = Pipeline()
file_lines = (
[p | ReadFromText('gs://my_bucket/file%s*' % i) for i in range(10)]
| beam.Flatten())
这应该有助于您的管道针对这种情况进行扩展。
至于永久性解决方案...我想可以尝试对源代码本身提出改进建议,以便将来的版本可能具有更好的性能。
我们还计划实施基于 Java 的 FileIO
转换的转换。这也可能有帮助。但它在 horizon.