Apache Beam 到 BigQuery
Apache Beam To BigQuery
我正在 Google 云数据流中构建一个进程,它将使用 Pub/Sub 中的消息,并根据一个键的值将它们写入 BQ 或 GCS。我能够拆分消息,但我不确定如何将数据写入 BigQuery。我试过使用 beam.io.gcp.bigquery.WriteToBigQuery
,但没有成功。
我的完整代码在这里:https://pastebin.com/4W9Vu4Km
基本上我的问题是我不知道如何在 WriteBatchesToBQ
(第 73 行)中指定变量 element
应该写入 BQ。
我也尝试过直接在管道中使用 beam.io.gcp.bigquery.WriteToBigQuery
(第 128 行),但随后出现错误 AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
。这可能是因为我给它提供的不是字典,而是字典列表(我想用 1-minute windows)。
有什么想法吗? (另外,如果代码中有什么太愚蠢的地方,请告诉我 - 我只是在短时间内玩 apache beam,我可能会忽略一些明显的问题)。
WriteToBigQuery 示例格式如下:-
project_id = "proj1"
dataset_id = 'dataset1'
table_id = 'table1'
table_schema = ('id:STRING, reqid:STRING')
| 'Write-CH' >> beam.io.WriteToBigQuery(
table=table_id,
dataset=dataset_id,
project=project_id,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
你可以参考这个case它会让你对beam数据流水线有一个简单的了解。
第二种方法是解决这个问题的方法,你需要直接在管道中使用WriteToBigQuery函数。但是,需要包含一个 beam.FlatMap 步骤,以便 WriteToBigQuery 可以正确处理字典列表。
因此完整的流水线拆分数据,按时间分组,写入BQ是这样定义的:
accepted_messages = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
window_size) | "FlatMap" >> beam.FlatMap(
lambda elements: elements) | "Write to BQ" >> beam.io.gcp.bigquery.WriteToBigQuery(table=output_table_bq,
schema=(
output_table_bq_schema),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
完整的工作代码在这里:https://pastebin.com/WFwBvPcU
我正在 Google 云数据流中构建一个进程,它将使用 Pub/Sub 中的消息,并根据一个键的值将它们写入 BQ 或 GCS。我能够拆分消息,但我不确定如何将数据写入 BigQuery。我试过使用 beam.io.gcp.bigquery.WriteToBigQuery
,但没有成功。
我的完整代码在这里:https://pastebin.com/4W9Vu4Km
基本上我的问题是我不知道如何在 WriteBatchesToBQ
(第 73 行)中指定变量 element
应该写入 BQ。
我也尝试过直接在管道中使用 beam.io.gcp.bigquery.WriteToBigQuery
(第 128 行),但随后出现错误 AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
。这可能是因为我给它提供的不是字典,而是字典列表(我想用 1-minute windows)。
有什么想法吗? (另外,如果代码中有什么太愚蠢的地方,请告诉我 - 我只是在短时间内玩 apache beam,我可能会忽略一些明显的问题)。
WriteToBigQuery 示例格式如下:-
project_id = "proj1"
dataset_id = 'dataset1'
table_id = 'table1'
table_schema = ('id:STRING, reqid:STRING')
| 'Write-CH' >> beam.io.WriteToBigQuery(
table=table_id,
dataset=dataset_id,
project=project_id,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
你可以参考这个case它会让你对beam数据流水线有一个简单的了解。
第二种方法是解决这个问题的方法,你需要直接在管道中使用WriteToBigQuery函数。但是,需要包含一个 beam.FlatMap 步骤,以便 WriteToBigQuery 可以正确处理字典列表。
因此完整的流水线拆分数据,按时间分组,写入BQ是这样定义的:
accepted_messages = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
window_size) | "FlatMap" >> beam.FlatMap(
lambda elements: elements) | "Write to BQ" >> beam.io.gcp.bigquery.WriteToBigQuery(table=output_table_bq,
schema=(
output_table_bq_schema),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
完整的工作代码在这里:https://pastebin.com/WFwBvPcU