如何在 Apache Beam 中拆分包含 json 个元素的文件

How do I split a file of json elements in Apache Beam

我将 Apache Beam 与 Python 一起使用,我有一个 ppl 文件,看起来像这样:
FileExample.ppl:

{"name":"Julio"} 
{"name":"Angel", "Age":35} 
{"name":"Maria","cellphone":NULL} 
{"name":NULL,"cellphone":"3451-14-12"} 

等...

我需要拆分文件不是针对每一行,而是针对每个 json(在实际文件中,json 不仅是一行,而且是多行和未定义的行数)。

然后我需要验证每个 json 的内容(因为在文件中有 6 种类型的 jsons,那些具有所有具有值的键,那些不,等等)。之后,我需要为每种类型的 json 使用不同的 pcollection。我正在考虑使用 beam.flatmap() 来实现这最后一步,但首先我需要有这样的东西:

jsons = pipeline | "splitElements"  >> ReadFromText(file)

提前谢谢你,请记住我是新手。

ReadFromText 总是一次一行地读取文本文件;如果您的 json 对象跨行拆分,您将不得不进行不同类型的读取。

一个选项是在 DoFn 中完整读取每个文件,例如

with beam.Pipeline() as p:
    readable_files = (
        p
        | beam.Create([...set of files to read...]). # or use fileio.MatchAll
        | fileio.ReadMatches)
    file_contents = paths | beam.ParDo(ReadFileDoFn())

其中 ReadFileDoFn 可以使用与 ReadFromText 相同的底层库,例如

class ReadFileDoFn(beam.DoFn):
  def process(self, readable_file):
    with readable_file.open() as handle:
      yield handle.read()

这将生成一个 PCollection,其元素是每个文件的全部内容。现在要将您的文本文件拆分为单独的 json 个对象,您可以执行类似

def text_blob_to_json_objects(text):
  # Turns a concatenated set of objects like '{...} {...}' into
  # a single json array '[{...}, {...}]'.
  as_json_array = '[%s]' % re.sub(r'}\S*{', '},{', text, re.M)
  # Returns the parsed array.
  return json.loads(as_json_array)

file_contents | beam.FlatMap(text_blob_to_json_objects)

然后您可以在后面加上 multi-output DoFn 来分隔各种类型。