从 Beam 中的另一个管道读取泡菜?

Read a pickle from another pipeline in Beam?

我是 运行 Cloud Dataflow 中的批处理管道。我需要在一个管道中读取另一个管道之前写入的对象。最简单的 wa 对象是 pickle / dill。

写的很好,写了很多个文件,每个文件都有一个pickle对象。当我手动下载文件时,我可以解压文件。写作代码:beam.io.WriteToText('gs://{}', coder=coders.DillCoder())

但每次读取都中断,并出现以下错误之一。阅读代码:beam.io.ReadFromText('gs://{}*', coder=coders.DillCoder())

要么...

  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
KeyError: '\x90'

...或...

  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named measur

(对象的 class 位于 measure 的路径中,但不确定为什么它会错过那里的最后一个字符)

我试过使用默认编码器和 BytesCoder,并将 pickling 和 unpickling 作为管道中的自定义任务。

我的工作假设是 reader 按行拆分文件,因此将单个泡菜(其中有新行)视为多个对象。如果是这样,有没有办法避免这种情况?

我可以尝试自己构建一个 reader,但我很犹豫,因为这似乎是一个很好解决的问题(例如,Beam 已经有一种格式可以将对象从一个管道阶段移动到另一个管道阶段)。

切向相关:

谢谢!

编码为 string_escape 转义换行符,因此 Beam 看到的唯一换行符是泡菜之间的换行符:

class DillMultiCoder(DillCoder):
    """
    Coder that allows multi-line pickles to be read
    After an object is pickled, the bytes are encoded as `unicode_escape`,
    meaning newline characters (`\n`) aren't in the string.

    Previously, the presence of newline characters these confues the Dataflow
    reader, as it can't discriminate between a new object and a new line
    within a pickle string
    """

    def _create_impl(self):
        return coder_impl.CallbackCoderImpl(
            maybe_dill_multi_dumps, maybe_dill_multi_loads)


def maybe_dill_multi_dumps(o):
    # in Py3 this needs to be `unicode_escape`
    return maybe_dill_dumps(o).encode('string_escape')


def maybe_dill_multi_loads(o):
    # in Py3 this needs to be `unicode_escape`
    return maybe_dill_loads(o.decode('string_escape'))

对于大泡菜,我还需要将缓冲区大小设置得更高,达到 8MB - 在之前的缓冲区大小 (8kB) 上,一个 120MB 的文件旋转了 2 天 CPU 时间:

class ReadFromTextPickle(ReadFromText):
    """
    Same as ReadFromText, but with a really big buffer. With the standard 8KB
    buffer, large files can be read on a loop and never finish

    Also added DillMultiCoder
    """

    def __init__(
            self,
            file_pattern=None,
            min_bundle_size=0,
            compression_type=CompressionTypes.AUTO,
            strip_trailing_newlines=True,
            coder=DillMultiCoder(),
            validate=True,
            skip_header_lines=0,
            **kwargs):
        # needs commenting out, not sure why    
        # super(ReadFromTextPickle, self).__init__(**kwargs)
        self._source = _TextSource(
            file_pattern,
            min_bundle_size,
            compression_type,
            strip_trailing_newlines=strip_trailing_newlines,
            coder=coder,
            validate=validate,
            skip_header_lines=skip_header_lines,
            buffer_size=8000000)

另一种方法是实现从 FileBasedSource 继承的 PickleFileSource 并在文件上调用 pickle.load - 每次调用都会产生一个新对象。但是 offset_range_tracker 周围有很多并发症,看起来比绝对必要的更有效

ReadFromText 旨在读取文本文件中换行分隔的记录,因此不适合您的用例。实施 FileBasedSource 也不是一个好的解决方案,因为它是为读取具有多条记录的大文件而设计的(并且通常将这些文件拆分成碎片以进行并行处理)。因此,在您的情况下,Python SDK 当前的最佳解决方案是自己实现一个源代码。这可以像读取文件并生成 PCollection 记录的 ParDo 一样简单。如果您的 ParDo 产生大量记录,请考虑在其后添加一个 apache_beam.transforms.util.Reshuffle 步骤,这将允许跑步者更好地并行处理后续步骤。对于 Java SDK,我们有 FileIO,它已经提供了转换以简化此操作。