作为数据流端输入的大型 numpy 矩阵
large numpy matrix as dataflow side input
我正在尝试在 Python 中编写数据流管道,它需要一个大的 numpy 矩阵作为辅助输入。矩阵保存在云存储中。理想情况下,每个 Dataflow worker 都会直接从云存储中加载矩阵。
我的理解是,如果我说matrix = np.load(LOCAL_PATH_TO_MATRIX)
,然后
p | "computation" >> beam.Map(computation, matrix)
矩阵从我的笔记本电脑发送到每个 Datflow 工作人员。
我怎样才能让每个工作人员直接从云存储中加载矩阵? "binary blob"有光束源吗?
你的做法是正确的。
在这种情况下,Dataflow 所做的是将 NumPy 矩阵作为辅助输入处理。这意味着它从您的机器上传到服务一次,Dataflow 服务会将它发送给每个工作人员。
鉴于矩阵很大,这将使您的工作人员使用 I/O 从服务接收它,并承担将整个矩阵保存在内存中的负担,但它应该可以工作。
如果你想避免computing/loading你机器中的矩阵,你可以将你的矩阵作为文本文件上传到 GCS,读取该文件,并获得矩阵。你可以这样做:
matrix_file = 'gs://mybucket/my/matrix'
p | beam.ParDo(ComputationDoFn(matrix_file))
你的 DoFn 可能是这样的:
class ComputationDoFn(beam.DoFn):
def __init__(self, matrix_file):
self._matrix_file = matrix_file
self._matrix = None
def start_bundle(self, element):
# We check because one DoFn instance may be reused
# for different bundles.
if self._matrix is None:
self.load_matrix(self._matrix_file)
def process(self, element):
# Now process the element
def load_matrix(self, matrix_file):
# Load the file from GCS using the GCS API
我希望这是有道理的。如果您觉得需要更多帮助,我可以充实功能。
我正在尝试在 Python 中编写数据流管道,它需要一个大的 numpy 矩阵作为辅助输入。矩阵保存在云存储中。理想情况下,每个 Dataflow worker 都会直接从云存储中加载矩阵。
我的理解是,如果我说matrix = np.load(LOCAL_PATH_TO_MATRIX)
,然后
p | "computation" >> beam.Map(computation, matrix)
矩阵从我的笔记本电脑发送到每个 Datflow 工作人员。
我怎样才能让每个工作人员直接从云存储中加载矩阵? "binary blob"有光束源吗?
你的做法是正确的。
在这种情况下,Dataflow 所做的是将 NumPy 矩阵作为辅助输入处理。这意味着它从您的机器上传到服务一次,Dataflow 服务会将它发送给每个工作人员。
鉴于矩阵很大,这将使您的工作人员使用 I/O 从服务接收它,并承担将整个矩阵保存在内存中的负担,但它应该可以工作。
如果你想避免computing/loading你机器中的矩阵,你可以将你的矩阵作为文本文件上传到 GCS,读取该文件,并获得矩阵。你可以这样做:
matrix_file = 'gs://mybucket/my/matrix'
p | beam.ParDo(ComputationDoFn(matrix_file))
你的 DoFn 可能是这样的:
class ComputationDoFn(beam.DoFn):
def __init__(self, matrix_file):
self._matrix_file = matrix_file
self._matrix = None
def start_bundle(self, element):
# We check because one DoFn instance may be reused
# for different bundles.
if self._matrix is None:
self.load_matrix(self._matrix_file)
def process(self, element):
# Now process the element
def load_matrix(self, matrix_file):
# Load the file from GCS using the GCS API
我希望这是有道理的。如果您觉得需要更多帮助,我可以充实功能。