如何在 Apache Beam 中拆分包含 json 个元素的文件
How do I split a file of json elements in Apache Beam
我将 Apache Beam 与 Python 一起使用,我有一个 ppl 文件,看起来像这样:
FileExample.ppl:
{"name":"Julio"}
{"name":"Angel", "Age":35}
{"name":"Maria","cellphone":NULL}
{"name":NULL,"cellphone":"3451-14-12"}
等...
我需要拆分文件不是针对每一行,而是针对每个 json(在实际文件中,json 不仅是一行,而且是多行和未定义的行数)。
然后我需要验证每个 json 的内容(因为在文件中有 6 种类型的 jsons,那些具有所有具有值的键,那些不,等等)。之后,我需要为每种类型的 json 使用不同的 pcollection。我正在考虑使用 beam.flatmap() 来实现这最后一步,但首先我需要有这样的东西:
jsons = pipeline | "splitElements" >> ReadFromText(file)
提前谢谢你,请记住我是新手。
ReadFromText
总是一次一行地读取文本文件;如果您的 json 对象跨行拆分,您将不得不进行不同类型的读取。
一个选项是在 DoFn 中完整读取每个文件,例如
with beam.Pipeline() as p:
readable_files = (
p
| beam.Create([...set of files to read...]). # or use fileio.MatchAll
| fileio.ReadMatches)
file_contents = paths | beam.ParDo(ReadFileDoFn())
其中 ReadFileDoFn
可以使用与 ReadFromText
相同的底层库,例如
class ReadFileDoFn(beam.DoFn):
def process(self, readable_file):
with readable_file.open() as handle:
yield handle.read()
这将生成一个 PCollection,其元素是每个文件的全部内容。现在要将您的文本文件拆分为单独的 json 个对象,您可以执行类似
def text_blob_to_json_objects(text):
# Turns a concatenated set of objects like '{...} {...}' into
# a single json array '[{...}, {...}]'.
as_json_array = '[%s]' % re.sub(r'}\S*{', '},{', text, re.M)
# Returns the parsed array.
return json.loads(as_json_array)
file_contents | beam.FlatMap(text_blob_to_json_objects)
然后您可以在后面加上 multi-output DoFn 来分隔各种类型。
我将 Apache Beam 与 Python 一起使用,我有一个 ppl 文件,看起来像这样:
FileExample.ppl:
{"name":"Julio"}
{"name":"Angel", "Age":35}
{"name":"Maria","cellphone":NULL}
{"name":NULL,"cellphone":"3451-14-12"}
等...
我需要拆分文件不是针对每一行,而是针对每个 json(在实际文件中,json 不仅是一行,而且是多行和未定义的行数)。
然后我需要验证每个 json 的内容(因为在文件中有 6 种类型的 jsons,那些具有所有具有值的键,那些不,等等)。之后,我需要为每种类型的 json 使用不同的 pcollection。我正在考虑使用 beam.flatmap() 来实现这最后一步,但首先我需要有这样的东西:
jsons = pipeline | "splitElements" >> ReadFromText(file)
提前谢谢你,请记住我是新手。
ReadFromText
总是一次一行地读取文本文件;如果您的 json 对象跨行拆分,您将不得不进行不同类型的读取。
一个选项是在 DoFn 中完整读取每个文件,例如
with beam.Pipeline() as p:
readable_files = (
p
| beam.Create([...set of files to read...]). # or use fileio.MatchAll
| fileio.ReadMatches)
file_contents = paths | beam.ParDo(ReadFileDoFn())
其中 ReadFileDoFn
可以使用与 ReadFromText
相同的底层库,例如
class ReadFileDoFn(beam.DoFn):
def process(self, readable_file):
with readable_file.open() as handle:
yield handle.read()
这将生成一个 PCollection,其元素是每个文件的全部内容。现在要将您的文本文件拆分为单独的 json 个对象,您可以执行类似
def text_blob_to_json_objects(text):
# Turns a concatenated set of objects like '{...} {...}' into
# a single json array '[{...}, {...}]'.
as_json_array = '[%s]' % re.sub(r'}\S*{', '},{', text, re.M)
# Returns the parsed array.
return json.loads(as_json_array)
file_contents | beam.FlatMap(text_blob_to_json_objects)
然后您可以在后面加上 multi-output DoFn 来分隔各种类型。