如何从 google dataflow apache beam python 中的 GCS 存储桶中读取多个 JSON 文件

How to read multiple JSON files from GCS bucket in google dataflow apache beam python

我在 GCS 中有一个包含 JSON 文件列表的存储桶。我来提取文件名列表使用

def list_blobs(bucket_name):


    storage_client = storage.Client()

    blobs = storage_client.list_blobs(bucket_name)
    json_paths = []
    for blob in blobs:
        json_paths.append(f"gs://{bucket_name}/{blob.name}")
    return json_paths

现在我想将这个文件名列表传递给 apache beam 以读取它们。我写了这段代码,但它似乎不是一个好的模式

for i,file in enumerate(list_files):
        print("parsing file:", file)
        concat_data = (p |'Data {}'.format(i) >> ReadFromText(file)
        )
        final_result.append(concat_data)

您以前遇到过同样的问题吗?

看看this link。 ReadFromText 是一个帮助读取文本文件的 PTransform。另一方面,ReadAllFromText 是一个 PTransform,它读取 PCollection 个文本文件。读取文本文件或文件模式的 PCollection 并生成字符串的 PCollection。

最后我就用google-云存储作为阅读API。

列出存储桶的所有元素

def list_blobs(bucket_name):
"""Lists all the blobs in the bucket."""

storage_client = storage.Client()
blobs = storage_client.list_blobs(bucket_name)
json_paths = []
for blob in blobs:
    #json_paths.append(f"gs://{bucket_name}/{blob.name}")
    json_paths.append(f"{blob.name}")
return json_paths

我创建了这个 ParDo 来阅读内容

class ReadFileContent(beam.DoFn):

def setup(self):
    # Called whenever the DoFn instance is deserialized on the worker.
    # This means it can be called more than once per worker because multiple instances of a given DoFn subclass may be created (e.g., due to parallelization, or due to garbage collection after a period of disuse).
    # This is a good place to connect to database instances, open network connections or other resources.
    self.storage_client = storage.Client()

def process(self, file_name, bucket_name):
    bucket = self.storage_client.get_bucket(bucket_name)
    blob = bucket.get_blob(file_name)
    yield blob.download_as_string()

mu 管道看起来像这样:

list_files = list_blobs(bucket_name)

with beam.Pipeline(options=pipeline_options) as p:

    results = (
        p | 'Create' >> beam.Create(list_files)
          | 'Read each file content' >> beam.ParDo(ReadFileContent(), bucket_name)
          | 'next transformation' >> ...