在 Apache Beam 管道中使用 MatchFiles() 获取文件名并在 python 中解析 json
Using MatchFiles() in apache beam pipeline to get file name and parse json in python
我在一个桶中有很多json个文件并使用python 3 我想获取文件名然后创建文件的键值对并读取它们。匹配文件现在适用于 python 我相信,但我想知道我将如何实现它:
files = p | fileio.MatchFiles("gs://mybuckenumerate/*.json")
| #Ideally want to create a tuple of filename, json row which I will pass into a ParDo that is a custom class that parses the json
假设我的存储桶中有 10 个文件:
gs://mybucket/myfile1.json
gs://mybucket/myfile2.json
并且存储桶中的 json 个文件都共享相同的结构
我将它传递到自定义 ParseFile class(我认为通过 ParDo,我的 apache beam 知识有限)并且对于 json 中的每一行我正在输出一个字典(我将保存到换行符分隔 json) 其中一个键是文件名。
太平洋标准时间 9 月 24 日 11:15 编辑:这是我尝试过的
file_content_pairs = (p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8())))
| beam.ParDo(TestThis())
)
TestThis() 只是应该打印内容:
class TestThis(beam.DoFn):
def process(self, element):
print(element)
print("stop")
yield element
但我在输出中看到的是:INFO:root:Finished 在 1.2762866020202637 秒内列出 2 个文件。
没看懂。您想要 (filename, json-parsed-contents)
的键值对吗?
如果是这样,你会:
file_content_pairs = (
p | fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8()))
)
因此,如果您的文件如下所示:
==============myfile.json===============
{"a": "b",
"c": "d",
"e": 1}
然后,您的 file_content_pairs
集合将包含键值对 ("myfile.json", {"a":"b", "c": "d", "e": 1})
。
如果您的文件是 json 行格式,您可以:
def consume_file(f):
other_name = query_bigquery(f.metadata.path)
return [(other_name, json.loads(line))
for line in f.read_utf8().strip().split('\n')]
with Pipeline() as p:
result = (p
| fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.FlatMap(consume_file))
我在一个桶中有很多json个文件并使用python 3 我想获取文件名然后创建文件的键值对并读取它们。匹配文件现在适用于 python 我相信,但我想知道我将如何实现它:
files = p | fileio.MatchFiles("gs://mybuckenumerate/*.json")
| #Ideally want to create a tuple of filename, json row which I will pass into a ParDo that is a custom class that parses the json
假设我的存储桶中有 10 个文件:
gs://mybucket/myfile1.json
gs://mybucket/myfile2.json
并且存储桶中的 json 个文件都共享相同的结构
我将它传递到自定义 ParseFile class(我认为通过 ParDo,我的 apache beam 知识有限)并且对于 json 中的每一行我正在输出一个字典(我将保存到换行符分隔 json) 其中一个键是文件名。
太平洋标准时间 9 月 24 日 11:15 编辑:这是我尝试过的
file_content_pairs = (p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8())))
| beam.ParDo(TestThis())
)
TestThis() 只是应该打印内容:
class TestThis(beam.DoFn):
def process(self, element):
print(element)
print("stop")
yield element
但我在输出中看到的是:INFO:root:Finished 在 1.2762866020202637 秒内列出 2 个文件。
没看懂。您想要 (filename, json-parsed-contents)
的键值对吗?
如果是这样,你会:
file_content_pairs = (
p | fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8()))
)
因此,如果您的文件如下所示:
==============myfile.json===============
{"a": "b",
"c": "d",
"e": 1}
然后,您的 file_content_pairs
集合将包含键值对 ("myfile.json", {"a":"b", "c": "d", "e": 1})
。
如果您的文件是 json 行格式,您可以:
def consume_file(f):
other_name = query_bigquery(f.metadata.path)
return [(other_name, json.loads(line))
for line in f.read_utf8().strip().split('\n')]
with Pipeline() as p:
result = (p
| fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.FlatMap(consume_file))