在 Apache Beam 管道中使用 MatchFiles() 获取文件名并在 python 中解析 json

Using MatchFiles() in apache beam pipeline to get file name and parse json in python

我在一个桶中有很多json个文件并使用python 3 我想获取文件名然后创建文件的键值对并读取它们。匹配文件现在适用于 python 我相信,但我想知道我将如何实现它:

files = p | fileio.MatchFiles("gs://mybuckenumerate/*.json") 
    | #Ideally want to create a tuple of filename, json row which I will pass into a ParDo that is a custom class that parses the json

假设我的存储桶中有 10 个文件:

gs://mybucket/myfile1.json
gs://mybucket/myfile2.json

并且存储桶中的 json 个文件都共享相同的结构

我将它传递到自定义 ParseFile class(我认为通过 ParDo,我的 apache beam 知识有限)并且对于 json 中的每一行我正在输出一个字典(我将保存到换行符分隔 json) 其中一个键是文件名。

太平洋标准时间 9 月 24 日 11:15 编辑:这是我尝试过的

file_content_pairs = (p 
                | fileio.MatchFiles(known_args.input_bucket)
                | fileio.ReadMatches()
                | beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8())))
                | beam.ParDo(TestThis())
                )

TestThis() 只是应该打印内容:

class TestThis(beam.DoFn):

    def process(self, element):
        print(element)
        print("stop")
        yield element

但我在输出中看到的是:INFO:root:Finished 在 1.2762866020202637 秒内列出 2 个文件。

没看懂。您想要 (filename, json-parsed-contents) 的键值对吗?

如果是这样,你会:

file_content_pairs = (
  p | fileio.MatchFiles("gs://mybucketname/*.json")
    | fileio.ReadMatches()
    | beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8()))
)

因此,如果您的文件如下所示:

==============myfile.json===============
{"a": "b",
 "c": "d",
 "e": 1}

然后,您的 file_content_pairs 集合将包含键值对 ("myfile.json", {"a":"b", "c": "d", "e": 1})


如果您的文件是 json 行格式,您可以:

def consume_file(f):
  other_name = query_bigquery(f.metadata.path)
  return [(other_name, json.loads(line))
          for line in f.read_utf8().strip().split('\n')]

with Pipeline() as p:
  result = (p
            | fileio.MatchFiles("gs://mybucketname/*.json")
            | fileio.ReadMatches()
            | beam.FlatMap(consume_file))