作为数据流端输入的大型 numpy 矩阵

large numpy matrix as dataflow side input

我正在尝试在 Python 中编写数据流管道,它需要一个大的 numpy 矩阵作为辅助输入。矩阵保存在云存储中。理想情况下,每个 Dataflow worker 都会直接从云存储中加载矩阵。

我的理解是,如果我说matrix = np.load(LOCAL_PATH_TO_MATRIX),然后

p | "computation" >> beam.Map(computation, matrix)

矩阵从我的笔记本电脑发送到每个 Datflow 工作人员。

我怎样才能让每个工作人员直接从云存储中加载矩阵? "binary blob"有光束源吗?

你的做法是正确的。

在这种情况下,Dataflow 所做的是将 NumPy 矩阵作为辅助输入处理。这意味着它从您的机器上传到服务一次,Dataflow 服务会将它发送给每个工作人员。

鉴于矩阵很大,这将使您的工作人员使用 I/O 从服务接收它,并承担将整个矩阵保存在内存中的负担,但它应该可以工作。


如果你想避免computing/loading你机器中的矩阵,你可以将你的矩阵作为文本文件上传到 GCS,读取该文件,并获得矩阵。你可以这样做:

matrix_file = 'gs://mybucket/my/matrix'
p | beam.ParDo(ComputationDoFn(matrix_file))

你的 DoFn 可能是这样的:

class ComputationDoFn(beam.DoFn):
  def __init__(self, matrix_file):
    self._matrix_file = matrix_file
    self._matrix = None

  def start_bundle(self, element):
    # We check because one DoFn instance may be reused
    # for different bundles.
    if self._matrix is None:
      self.load_matrix(self._matrix_file)

  def process(self, element):
    # Now process the element

  def load_matrix(self, matrix_file):
    # Load the file from GCS using the GCS API

我希望这是有道理的。如果您觉得需要更多帮助,我可以充实功能。