将列表转换为 PCollection
Convert a list into a PCollection
我目前有一个 DoFn
查看存储桶并查看该存储桶和目录前缀中的所有文件。此 DoFn
returns 列表而不是 PCollection
。我如何将这个列表转换成 PCollection
可以被 DoFn
ConvertFileNames
使用?
# List all the files within a subdir
class ListBlobs(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()
def process(self, prefix):
bucket = self.storage_client.bucket('xxx')
return list(self.bucket.list_blobs(prefix=prefix))
# Convert Blobs into filenames as patterns
class ConvertFileNames(beam.DoFn):
def process(self, blob):
return 'gs://' + blob.bucket.name + blob.name
如 beam documentation 中所述,Beam DoFn 的处理方法 returns 可迭代的元素放置到下游 PCollection 中。所以,在你的例子中,如果我有一个 PCollection 前缀,称它为 prefix_pcoll
,那么我可以写
blobs_pcoll = prefix_pcoll | beam.ParDo(ListBlobs())
和 blobs_pcoll
将包含具有此前缀的 blob 列表(即 list(self.bucket.list_blobs(prefix=prefix))
对所有前缀的串联)。然后你可以写
converted = blobs_pcoll | beam.ParDo(ConvertFileNames())
你也可以这样写
converted = blobs_pcoll | beam.Map(
lambda blob: 'gs://' + blob.bucket.name + blob.name)
您可能还想查看 apache_beam.io.fileio.MatchAll。
我目前有一个 DoFn
查看存储桶并查看该存储桶和目录前缀中的所有文件。此 DoFn
returns 列表而不是 PCollection
。我如何将这个列表转换成 PCollection
可以被 DoFn
ConvertFileNames
使用?
# List all the files within a subdir
class ListBlobs(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()
def process(self, prefix):
bucket = self.storage_client.bucket('xxx')
return list(self.bucket.list_blobs(prefix=prefix))
# Convert Blobs into filenames as patterns
class ConvertFileNames(beam.DoFn):
def process(self, blob):
return 'gs://' + blob.bucket.name + blob.name
如 beam documentation 中所述,Beam DoFn 的处理方法 returns 可迭代的元素放置到下游 PCollection 中。所以,在你的例子中,如果我有一个 PCollection 前缀,称它为 prefix_pcoll
,那么我可以写
blobs_pcoll = prefix_pcoll | beam.ParDo(ListBlobs())
和 blobs_pcoll
将包含具有此前缀的 blob 列表(即 list(self.bucket.list_blobs(prefix=prefix))
对所有前缀的串联)。然后你可以写
converted = blobs_pcoll | beam.ParDo(ConvertFileNames())
你也可以这样写
converted = blobs_pcoll | beam.Map(
lambda blob: 'gs://' + blob.bucket.name + blob.name)
您可能还想查看 apache_beam.io.fileio.MatchAll。