使用侧面输出使用Dataflow(Apache Beam)将数据从具有不同模式的同一文件插入BigQuery单独的表
Insert data into BigQuery separate tables from same file with different schema using Dataflow(Apache Beam) using side output
我有这个要求,我需要开发一个审计机制。例如有一个 JSON 文件名 emp.json:
{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageschemaid": null,
"messageschema": null,
"message": {
"data": {
"ID": "data_in_quotes",
"NAME": "data_in_quotes",
"SALARY": "data_in_quotes"
},
"beforeData": null,
"headers": {
"operation": "INSERT",
"changeSequence": "20200822230048000000000017887787417",
"timestamp": "2020-08-22T23:00:48.000",
"transactionId": "some_id"
}
}
}
我需要先将数据插入 Bigquery table:
Staging.emp
身份证、姓名、工资
1, ABC, 20000
2, XYZ, 30000
我还需要将 ID、时间戳(加载时间戳)和文件名插入单独的 table:
杂项-dataset.Audit_table
ID、时间戳、文件名
1, 28-08-2020 22:55, emp.json
2, 28-08-2020 22:55, emp.json
目前我正在插入文件进行测试。如果解决了,我会使用
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
"{0}:{1}.emp_data".format(projectId, datasetId),
schema=table_schema,
#write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
我无法使用下面的代码使用侧面输出来获得它。
class InsertIntoBQAndAudit(beam.DoFn):
def process(self, element):
norm = json_normalize(json.loads(element), max_level=1)
l = norm["message.data"]
return l
def process(self, element):
norm1 = json_normalize(json.loads(element), max_level=1)
l1 = norm1["message.data"]
l1['ID']
l1['TIMESTAMP':datetime.now()]
l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
return [beam.pvalue.TaggedOutput('Audit', l1)]
options = PipelineOptions()
p = beam.Pipeline(options=options)
data_from_source = (p
| "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/bill_document_prod_data_sample_records")
| "Send to Different Tags" >> beam.ParDo(InsertIntoBQAndAudit()).with_outputs('Audit', main='Load')
)
Audit_col = data_from_source.Audit
Load_col = data_from_source.Load
Load_col | "Write to Actual Table" >> WriteToText("gs://ti-project-1/output/data_output")
Audit_col | "Write to Audit Table" >> WriteToText("gs://ti-project-1/misc-temp/audit_out")
p.run()
一个DoFn不能有两种处理方法。相反,创建两个单独的 DoFns 并将每个应用到同一个 PCollection,例如
class Audit(beam.DoFn):
def process(self, l1)
l1['ID']
l1['TIMESTAMP':datetime.now()]
# This line above seems to have gotten mangled in transit...
l1 = [m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
yield l1
with beam.Pipeline(options=options) as p:
data_from_source = p | "READ FROM JSON" >> ReadFromText("gs://...")
parsed_data = data_from_source | beam.Map(
lambda element: json_normalize(json.loads(element), max_level=1)["message.data"])
Load_col = parsed_data
Audit_col = parsed_data | beam.ParDo(Audit)
...
你可以为此使用 multi-output DoFn,它看起来像
class ParseAndAudit(beam.DoFn):
def process(self, element):
norm = json_normalize(json.loads(element), max_level=1)
l = norm["message.data"]
yield l # emit the main output
# continue in the same process method
l1 = l
l1['ID']
l1['TIMESTAMP':datetime.now()]
l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
yield beam.pvalue.TaggedOutput('Audit', l1)
然后像上面那样使用它,但是编写和维护起来可能更复杂。
我有这个要求,我需要开发一个审计机制。例如有一个 JSON 文件名 emp.json:
{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageschemaid": null,
"messageschema": null,
"message": {
"data": {
"ID": "data_in_quotes",
"NAME": "data_in_quotes",
"SALARY": "data_in_quotes"
},
"beforeData": null,
"headers": {
"operation": "INSERT",
"changeSequence": "20200822230048000000000017887787417",
"timestamp": "2020-08-22T23:00:48.000",
"transactionId": "some_id"
}
}
}
我需要先将数据插入 Bigquery table: Staging.emp
身份证、姓名、工资
1, ABC, 20000
2, XYZ, 30000
我还需要将 ID、时间戳(加载时间戳)和文件名插入单独的 table: 杂项-dataset.Audit_table
ID、时间戳、文件名
1, 28-08-2020 22:55, emp.json
2, 28-08-2020 22:55, emp.json
目前我正在插入文件进行测试。如果解决了,我会使用
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
"{0}:{1}.emp_data".format(projectId, datasetId),
schema=table_schema,
#write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
我无法使用下面的代码使用侧面输出来获得它。
class InsertIntoBQAndAudit(beam.DoFn):
def process(self, element):
norm = json_normalize(json.loads(element), max_level=1)
l = norm["message.data"]
return l
def process(self, element):
norm1 = json_normalize(json.loads(element), max_level=1)
l1 = norm1["message.data"]
l1['ID']
l1['TIMESTAMP':datetime.now()]
l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
return [beam.pvalue.TaggedOutput('Audit', l1)]
options = PipelineOptions()
p = beam.Pipeline(options=options)
data_from_source = (p
| "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/bill_document_prod_data_sample_records")
| "Send to Different Tags" >> beam.ParDo(InsertIntoBQAndAudit()).with_outputs('Audit', main='Load')
)
Audit_col = data_from_source.Audit
Load_col = data_from_source.Load
Load_col | "Write to Actual Table" >> WriteToText("gs://ti-project-1/output/data_output")
Audit_col | "Write to Audit Table" >> WriteToText("gs://ti-project-1/misc-temp/audit_out")
p.run()
一个DoFn不能有两种处理方法。相反,创建两个单独的 DoFns 并将每个应用到同一个 PCollection,例如
class Audit(beam.DoFn):
def process(self, l1)
l1['ID']
l1['TIMESTAMP':datetime.now()]
# This line above seems to have gotten mangled in transit...
l1 = [m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
yield l1
with beam.Pipeline(options=options) as p:
data_from_source = p | "READ FROM JSON" >> ReadFromText("gs://...")
parsed_data = data_from_source | beam.Map(
lambda element: json_normalize(json.loads(element), max_level=1)["message.data"])
Load_col = parsed_data
Audit_col = parsed_data | beam.ParDo(Audit)
...
你可以为此使用 multi-output DoFn,它看起来像
class ParseAndAudit(beam.DoFn):
def process(self, element):
norm = json_normalize(json.loads(element), max_level=1)
l = norm["message.data"]
yield l # emit the main output
# continue in the same process method
l1 = l
l1['ID']
l1['TIMESTAMP':datetime.now()]
l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
yield beam.pvalue.TaggedOutput('Audit', l1)
然后像上面那样使用它,但是编写和维护起来可能更复杂。