如何通过 Apache Beam / Google Cloud DataFlow 中的多个 ParDo 转换处理对本地文件的操作
How to handle operations on local files over multiple ParDo transforms in Apache Beam / Google Cloud DataFlow
我正在为 Google Cloud Dataflow 开发一个 ETL 管道,其中我有几个 b运行ching ParDo
t运行sform,每个都需要一个本地音频文件。然后将 b运行ched 结果合并并导出为文本。
这最初是一个 Python 脚本 运行 在我试图使用 GC 数据流进行 VM worker 并行化的一台机器上。
提取过程从单个 GCS 存储桶位置下载文件,然后在 t运行sform 完成后删除它们以控制存储。这是由于预处理模块需要对文件进行本地访问。这可以通过我自己重写一些预处理库来重新设计以处理字节流而不是文件 - 然而,一些尝试并不顺利,我想首先探索如何处理并行本地Apache Beam / GC Dataflow 中的文件操作,以便更好地理解框架。
在这个粗略的实现中,每个 b运行ch 都会下载和删除文件,并进行大量双重处理。在我的实现中,我有 8 个 b运行ches,因此每个文件都被下载和删除了 8 次。是否可以将 GCS 存储桶安装在每个工作人员上而不是从远程下载文件?
或者是否有另一种方法可以确保向工作人员传递对文件的正确引用,以便:
- 单个
DownloadFilesDoFn()
可以下载一批
- 然后将
PCollection
中的本地文件引用扇出到所有 b运行ches
- 然后最后的
CleanUpFilesDoFn()
可以删除它们
- 如何并行化本地文件引用?
如果无法避免本地文件操作,Apache Beam/GC 数据流的最佳 b运行ched ParDo
策略是什么?
为简单起见,我现有的带有两个 b运行ches 的实现的一些示例代码。
# singleton decorator
def singleton(cls):
instances = {}
def getinstance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return getinstance
@singleton
class Predict():
def __init__(self, model):
'''
Process audio, reads in filename
Returns Prediction
'''
self.model = model
def process(self, filename):
#simplified pseudocode
audio = preprocess.load(filename=filename)
prediction = inference(self.model, audio)
return prediction
class PredictDoFn(beam.DoFn):
def __init__(self, model):
self.localfile, self.model = "", model
def process(self, element):
# Construct Predict() object singleton per worker
predict = Predict(self.model)
subprocess.run(['gsutil','cp',element['GCSPath'],'./'], cwd=cwd, shell=False)
self.localfile = cwd + "/" + element['GCSPath'].split('/')[-1]
res = predict.process(self.localfile)
return [{
'Index': element['Index'],
'Title': element['Title'],
'File' : element['GCSPath'],
self.model + 'Prediction': res
}]
def finish_bundle(self):
subprocess.run(['rm',self.localfile], cwd=cwd, shell=False)
# DoFn to split csv into elements (GSC bucket could be read as a PCollection instead maybe)
class Split(beam.DoFn):
def process(self, element):
Index,Title,GCSPath = element.split(",")
GCSPath = 'gs://mybucket/'+ GCSPath
return [{
'Index': int(Index),
'Title': Title,
'GCSPath': GCSPath
}]
管道的简化版本:
with beam.Pipeline(argv=pipeline_args) as p:
files =
(
p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input)
| 'Parse CSV into Dict' >> beam.ParDo(Split())
)
# prediction 1 branch
preds1 =
(
files | 'Prediction 1' >> beam.ParDo(PredictDoFn(model1))
)
# prediction 2 branch
preds2 =
(
files | 'Prediction 2' >> beam.ParDo(PredictDoFn(model2))
)
# join branches
joined = { preds1, preds2 }
# output to file
output =
(
joined | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
)
为了避免重复下载文件,可以将文件内容放入pCollection
class DownloadFilesDoFn(beam.DoFn):
def __init__(self):
import re
self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')
def start_bundle(self):
import google.cloud.storage
self.gcs = google.cloud.storage.Client()
def process(self, element):
file_match = self.gcs_path_regex.match(element['GCSPath'])
bucket = self.gcs.get_bucket(file_match.group(1))
blob = bucket.get_blob(file_match.group(2))
element['file_contents'] = blob.download_as_bytes()
yield element
然后 PredictDoFn 变为:
class PredictDoFn(beam.DoFn):
def __init__(self, model):
self.model = model
def start_bundle(self):
self.predict = Predict(self.model)
def process(self, element):
res = self.predict.process(element['file_contents'])
return [{
'Index': element['Index'],
'Title': element['Title'],
'File' : element['GCSPath'],
self.model + 'Prediction': res
}]
和管道:
with beam.Pipeline(argv=pipeline_args) as p:
files =
(
p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input)
| 'Parse CSV into Dict' >> beam.ParDo(Split())
| 'Read files' >> beam.ParDo(DownloadFilesDoFn())
)
# prediction 1 branch
preds1 =
(
files | 'Prediction 1' >> beam.ParDo(PredictDoFn(model1))
)
# prediction 2 branch
preds2 =
(
files | 'Prediction 2' >> beam.ParDo(PredictDoFn(model2))
)
# join branches
joined = { preds1, preds2 }
# output to file
output =
(
joined | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
)
我正在为 Google Cloud Dataflow 开发一个 ETL 管道,其中我有几个 b运行ching ParDo
t运行sform,每个都需要一个本地音频文件。然后将 b运行ched 结果合并并导出为文本。
这最初是一个 Python 脚本 运行 在我试图使用 GC 数据流进行 VM worker 并行化的一台机器上。
提取过程从单个 GCS 存储桶位置下载文件,然后在 t运行sform 完成后删除它们以控制存储。这是由于预处理模块需要对文件进行本地访问。这可以通过我自己重写一些预处理库来重新设计以处理字节流而不是文件 - 然而,一些尝试并不顺利,我想首先探索如何处理并行本地Apache Beam / GC Dataflow 中的文件操作,以便更好地理解框架。
在这个粗略的实现中,每个 b运行ch 都会下载和删除文件,并进行大量双重处理。在我的实现中,我有 8 个 b运行ches,因此每个文件都被下载和删除了 8 次。是否可以将 GCS 存储桶安装在每个工作人员上而不是从远程下载文件?
或者是否有另一种方法可以确保向工作人员传递对文件的正确引用,以便:
- 单个
DownloadFilesDoFn()
可以下载一批 - 然后将
PCollection
中的本地文件引用扇出到所有 b运行ches - 然后最后的
CleanUpFilesDoFn()
可以删除它们 - 如何并行化本地文件引用?
如果无法避免本地文件操作,Apache Beam/GC 数据流的最佳 b运行ched ParDo
策略是什么?
为简单起见,我现有的带有两个 b运行ches 的实现的一些示例代码。
# singleton decorator
def singleton(cls):
instances = {}
def getinstance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return getinstance
@singleton
class Predict():
def __init__(self, model):
'''
Process audio, reads in filename
Returns Prediction
'''
self.model = model
def process(self, filename):
#simplified pseudocode
audio = preprocess.load(filename=filename)
prediction = inference(self.model, audio)
return prediction
class PredictDoFn(beam.DoFn):
def __init__(self, model):
self.localfile, self.model = "", model
def process(self, element):
# Construct Predict() object singleton per worker
predict = Predict(self.model)
subprocess.run(['gsutil','cp',element['GCSPath'],'./'], cwd=cwd, shell=False)
self.localfile = cwd + "/" + element['GCSPath'].split('/')[-1]
res = predict.process(self.localfile)
return [{
'Index': element['Index'],
'Title': element['Title'],
'File' : element['GCSPath'],
self.model + 'Prediction': res
}]
def finish_bundle(self):
subprocess.run(['rm',self.localfile], cwd=cwd, shell=False)
# DoFn to split csv into elements (GSC bucket could be read as a PCollection instead maybe)
class Split(beam.DoFn):
def process(self, element):
Index,Title,GCSPath = element.split(",")
GCSPath = 'gs://mybucket/'+ GCSPath
return [{
'Index': int(Index),
'Title': Title,
'GCSPath': GCSPath
}]
管道的简化版本:
with beam.Pipeline(argv=pipeline_args) as p:
files =
(
p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input)
| 'Parse CSV into Dict' >> beam.ParDo(Split())
)
# prediction 1 branch
preds1 =
(
files | 'Prediction 1' >> beam.ParDo(PredictDoFn(model1))
)
# prediction 2 branch
preds2 =
(
files | 'Prediction 2' >> beam.ParDo(PredictDoFn(model2))
)
# join branches
joined = { preds1, preds2 }
# output to file
output =
(
joined | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
)
为了避免重复下载文件,可以将文件内容放入pCollection
class DownloadFilesDoFn(beam.DoFn):
def __init__(self):
import re
self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')
def start_bundle(self):
import google.cloud.storage
self.gcs = google.cloud.storage.Client()
def process(self, element):
file_match = self.gcs_path_regex.match(element['GCSPath'])
bucket = self.gcs.get_bucket(file_match.group(1))
blob = bucket.get_blob(file_match.group(2))
element['file_contents'] = blob.download_as_bytes()
yield element
然后 PredictDoFn 变为:
class PredictDoFn(beam.DoFn):
def __init__(self, model):
self.model = model
def start_bundle(self):
self.predict = Predict(self.model)
def process(self, element):
res = self.predict.process(element['file_contents'])
return [{
'Index': element['Index'],
'Title': element['Title'],
'File' : element['GCSPath'],
self.model + 'Prediction': res
}]
和管道:
with beam.Pipeline(argv=pipeline_args) as p:
files =
(
p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input)
| 'Parse CSV into Dict' >> beam.ParDo(Split())
| 'Read files' >> beam.ParDo(DownloadFilesDoFn())
)
# prediction 1 branch
preds1 =
(
files | 'Prediction 1' >> beam.ParDo(PredictDoFn(model1))
)
# prediction 2 branch
preds2 =
(
files | 'Prediction 2' >> beam.ParDo(PredictDoFn(model2))
)
# join branches
joined = { preds1, preds2 }
# output to file
output =
(
joined | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
)