从 Beam 中的另一个管道读取泡菜?
Read a pickle from another pipeline in Beam?
我是 运行 Cloud Dataflow 中的批处理管道。我需要在一个管道中读取另一个管道之前写入的对象。最简单的 wa 对象是 pickle / dill。
写的很好,写了很多个文件,每个文件都有一个pickle对象。当我手动下载文件时,我可以解压文件。写作代码:beam.io.WriteToText('gs://{}', coder=coders.DillCoder())
但每次读取都中断,并出现以下错误之一。阅读代码:beam.io.ReadFromText('gs://{}*', coder=coders.DillCoder())
要么...
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
KeyError: '\x90'
...或...
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named measur
(对象的 class 位于 measure
的路径中,但不确定为什么它会错过那里的最后一个字符)
我试过使用默认编码器和 BytesCoder
,并将 pickling 和 unpickling 作为管道中的自定义任务。
我的工作假设是 reader 按行拆分文件,因此将单个泡菜(其中有新行)视为多个对象。如果是这样,有没有办法避免这种情况?
我可以尝试自己构建一个 reader,但我很犹豫,因为这似乎是一个很好解决的问题(例如,Beam 已经有一种格式可以将对象从一个管道阶段移动到另一个管道阶段)。
切向相关:
谢谢!
编码为 string_escape
转义换行符,因此 Beam 看到的唯一换行符是泡菜之间的换行符:
class DillMultiCoder(DillCoder):
"""
Coder that allows multi-line pickles to be read
After an object is pickled, the bytes are encoded as `unicode_escape`,
meaning newline characters (`\n`) aren't in the string.
Previously, the presence of newline characters these confues the Dataflow
reader, as it can't discriminate between a new object and a new line
within a pickle string
"""
def _create_impl(self):
return coder_impl.CallbackCoderImpl(
maybe_dill_multi_dumps, maybe_dill_multi_loads)
def maybe_dill_multi_dumps(o):
# in Py3 this needs to be `unicode_escape`
return maybe_dill_dumps(o).encode('string_escape')
def maybe_dill_multi_loads(o):
# in Py3 this needs to be `unicode_escape`
return maybe_dill_loads(o.decode('string_escape'))
对于大泡菜,我还需要将缓冲区大小设置得更高,达到 8MB - 在之前的缓冲区大小 (8kB) 上,一个 120MB 的文件旋转了 2 天 CPU 时间:
class ReadFromTextPickle(ReadFromText):
"""
Same as ReadFromText, but with a really big buffer. With the standard 8KB
buffer, large files can be read on a loop and never finish
Also added DillMultiCoder
"""
def __init__(
self,
file_pattern=None,
min_bundle_size=0,
compression_type=CompressionTypes.AUTO,
strip_trailing_newlines=True,
coder=DillMultiCoder(),
validate=True,
skip_header_lines=0,
**kwargs):
# needs commenting out, not sure why
# super(ReadFromTextPickle, self).__init__(**kwargs)
self._source = _TextSource(
file_pattern,
min_bundle_size,
compression_type,
strip_trailing_newlines=strip_trailing_newlines,
coder=coder,
validate=validate,
skip_header_lines=skip_header_lines,
buffer_size=8000000)
另一种方法是实现从 FileBasedSource
继承的 PickleFileSource
并在文件上调用 pickle.load
- 每次调用都会产生一个新对象。但是 offset_range_tracker
周围有很多并发症,看起来比绝对必要的更有效
ReadFromText
旨在读取文本文件中换行分隔的记录,因此不适合您的用例。实施 FileBasedSource
也不是一个好的解决方案,因为它是为读取具有多条记录的大文件而设计的(并且通常将这些文件拆分成碎片以进行并行处理)。因此,在您的情况下,Python SDK 当前的最佳解决方案是自己实现一个源代码。这可以像读取文件并生成 PCollection
记录的 ParDo
一样简单。如果您的 ParDo
产生大量记录,请考虑在其后添加一个 apache_beam.transforms.util.Reshuffle
步骤,这将允许跑步者更好地并行处理后续步骤。对于 Java SDK,我们有 FileIO
,它已经提供了转换以简化此操作。
我是 运行 Cloud Dataflow 中的批处理管道。我需要在一个管道中读取另一个管道之前写入的对象。最简单的 wa 对象是 pickle / dill。
写的很好,写了很多个文件,每个文件都有一个pickle对象。当我手动下载文件时,我可以解压文件。写作代码:beam.io.WriteToText('gs://{}', coder=coders.DillCoder())
但每次读取都中断,并出现以下错误之一。阅读代码:beam.io.ReadFromText('gs://{}*', coder=coders.DillCoder())
要么...
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
KeyError: '\x90'
...或...
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named measur
(对象的 class 位于 measure
的路径中,但不确定为什么它会错过那里的最后一个字符)
我试过使用默认编码器和 BytesCoder
,并将 pickling 和 unpickling 作为管道中的自定义任务。
我的工作假设是 reader 按行拆分文件,因此将单个泡菜(其中有新行)视为多个对象。如果是这样,有没有办法避免这种情况?
我可以尝试自己构建一个 reader,但我很犹豫,因为这似乎是一个很好解决的问题(例如,Beam 已经有一种格式可以将对象从一个管道阶段移动到另一个管道阶段)。
切向相关:
谢谢!
编码为 string_escape
转义换行符,因此 Beam 看到的唯一换行符是泡菜之间的换行符:
class DillMultiCoder(DillCoder):
"""
Coder that allows multi-line pickles to be read
After an object is pickled, the bytes are encoded as `unicode_escape`,
meaning newline characters (`\n`) aren't in the string.
Previously, the presence of newline characters these confues the Dataflow
reader, as it can't discriminate between a new object and a new line
within a pickle string
"""
def _create_impl(self):
return coder_impl.CallbackCoderImpl(
maybe_dill_multi_dumps, maybe_dill_multi_loads)
def maybe_dill_multi_dumps(o):
# in Py3 this needs to be `unicode_escape`
return maybe_dill_dumps(o).encode('string_escape')
def maybe_dill_multi_loads(o):
# in Py3 this needs to be `unicode_escape`
return maybe_dill_loads(o.decode('string_escape'))
对于大泡菜,我还需要将缓冲区大小设置得更高,达到 8MB - 在之前的缓冲区大小 (8kB) 上,一个 120MB 的文件旋转了 2 天 CPU 时间:
class ReadFromTextPickle(ReadFromText):
"""
Same as ReadFromText, but with a really big buffer. With the standard 8KB
buffer, large files can be read on a loop and never finish
Also added DillMultiCoder
"""
def __init__(
self,
file_pattern=None,
min_bundle_size=0,
compression_type=CompressionTypes.AUTO,
strip_trailing_newlines=True,
coder=DillMultiCoder(),
validate=True,
skip_header_lines=0,
**kwargs):
# needs commenting out, not sure why
# super(ReadFromTextPickle, self).__init__(**kwargs)
self._source = _TextSource(
file_pattern,
min_bundle_size,
compression_type,
strip_trailing_newlines=strip_trailing_newlines,
coder=coder,
validate=validate,
skip_header_lines=skip_header_lines,
buffer_size=8000000)
另一种方法是实现从 FileBasedSource
继承的 PickleFileSource
并在文件上调用 pickle.load
- 每次调用都会产生一个新对象。但是 offset_range_tracker
周围有很多并发症,看起来比绝对必要的更有效
ReadFromText
旨在读取文本文件中换行分隔的记录,因此不适合您的用例。实施 FileBasedSource
也不是一个好的解决方案,因为它是为读取具有多条记录的大文件而设计的(并且通常将这些文件拆分成碎片以进行并行处理)。因此,在您的情况下,Python SDK 当前的最佳解决方案是自己实现一个源代码。这可以像读取文件并生成 PCollection
记录的 ParDo
一样简单。如果您的 ParDo
产生大量记录,请考虑在其后添加一个 apache_beam.transforms.util.Reshuffle
步骤,这将允许跑步者更好地并行处理后续步骤。对于 Java SDK,我们有 FileIO
,它已经提供了转换以简化此操作。