使用侧面输出使用Dataflow(Apache Beam)将数据从具有不同模式的同一文件插入BigQuery单独的表

Insert data into BigQuery separate tables from same file with different schema using Dataflow(Apache Beam) using side output

我有这个要求,我需要开发一个审计机制。例如有一个 JSON 文件名 emp.json:

{
    "magic": "atMSG",
    "type": "DT",
    "headers": null,
    "messageschemaid": null,
    "messageschema": null,
    "message": {
        "data": {
            "ID": "data_in_quotes",
            "NAME": "data_in_quotes",
            "SALARY": "data_in_quotes"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "20200822230048000000000017887787417",
            "timestamp": "2020-08-22T23:00:48.000",
            "transactionId": "some_id"
        }
    }
}

我需要先将数据插入 Bigquery table: Staging.emp

身份证、姓名、工资

1, ABC, 20000

2, XYZ, 30000

我还需要将 ID、时间戳(加载时间戳)和文件名插入单独的 table: 杂项-dataset.Audit_table

ID、时间戳、文件名

1, 28-08-2020 22:55, emp.json

2, 28-08-2020 22:55, emp.json

目前我正在插入文件进行测试。如果解决了,我会使用

| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
                        "{0}:{1}.emp_data".format(projectId, datasetId),
                        schema=table_schema,
                        #write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                        )

我无法使用下面的代码使用侧面输出来获得它。

class InsertIntoBQAndAudit(beam.DoFn):
    
    def process(self, element):
        norm = json_normalize(json.loads(element), max_level=1)
        l = norm["message.data"]
        return l
    
    def process(self, element):
        norm1 = json_normalize(json.loads(element), max_level=1)
        l1 = norm1["message.data"]
        
        l1['ID']
        l1['TIMESTAMP':datetime.now()]
        l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
        return [beam.pvalue.TaggedOutput('Audit', l1)]

options = PipelineOptions()
p = beam.Pipeline(options=options)

data_from_source = (p
                    | "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/bill_document_prod_data_sample_records")
                     | "Send to Different Tags" >> beam.ParDo(InsertIntoBQAndAudit()).with_outputs('Audit', main='Load')
                   )

Audit_col = data_from_source.Audit
Load_col = data_from_source.Load

Load_col | "Write to Actual Table" >> WriteToText("gs://ti-project-1/output/data_output")
Audit_col | "Write to Audit Table" >> WriteToText("gs://ti-project-1/misc-temp/audit_out")

p.run()

一个DoFn不能有两种处理方法。相反,创建两个单独的 DoFns 并将每个应用到同一个 PCollection,例如

class Audit(beam.DoFn):
  def process(self, l1)
    l1['ID']
    l1['TIMESTAMP':datetime.now()]
    # This line above seems to have gotten mangled in transit...
    l1 = [m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
    yield l1

with beam.Pipeline(options=options) as p:
  data_from_source = p | "READ FROM JSON" >> ReadFromText("gs://...")
  parsed_data = data_from_source | beam.Map(
      lambda element: json_normalize(json.loads(element), max_level=1)["message.data"])
  
  Load_col = parsed_data
  Audit_col = parsed_data | beam.ParDo(Audit)
  ...

你可以为此使用 multi-output DoFn,它看起来像

class ParseAndAudit(beam.DoFn):
    
    def process(self, element):
        norm = json_normalize(json.loads(element), max_level=1)
        l = norm["message.data"]
        yield l  # emit the main output
    
        # continue in the same process method
        l1 = l
        
        l1['ID']
        l1['TIMESTAMP':datetime.now()]
        l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
        yield beam.pvalue.TaggedOutput('Audit', l1)

然后像上面那样使用它,但是编写和维护起来可能更复杂。