Apache Beam 到 BigQuery

Apache Beam To BigQuery

我正在 Google 云数据流中构建一个进程,它将使用 Pub/Sub 中的消息,并根据一个键的值将它们写入 BQ 或 GCS。我能够拆分消息,但我不确定如何将数据写入 BigQuery。我试过使用 beam.io.gcp.bigquery.WriteToBigQuery,但没有成功。

我的完整代码在这里:https://pastebin.com/4W9Vu4Km

基本上我的问题是我不知道如何在 WriteBatchesToBQ(第 73 行)中指定变量 element 应该写入 BQ。

我也尝试过直接在管道中使用 beam.io.gcp.bigquery.WriteToBigQuery(第 128 行),但随后出现错误 AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)'] 。这可能是因为我给它提供的不是字典,而是字典列表(我想用 1-minute windows)。

有什么想法吗? (另外,如果代码中有什么太愚蠢的地方,请告诉我 - 我只是在短时间内玩 apache beam,我可能会忽略一些明显的问题)。

WriteToBigQuery 示例格式如下:-

    project_id = "proj1"
    dataset_id = 'dataset1'
    table_id = 'table1'
    table_schema = ('id:STRING, reqid:STRING')

        | 'Write-CH' >> beam.io.WriteToBigQuery(
                                                    table=table_id,
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                    ))

你可以参考这个case它会让你对beam数据流水线有一个简单的了解。

第二种方法是解决这个问题的方法,你需要直接在管道中使用WriteToBigQuery函数。但是,需要包含一个 beam.FlatMap 步骤,以便 WriteToBigQuery 可以正确处理字典列表。

因此完整的流水线拆分数据,按时间分组,写入BQ是这样定义的:

 accepted_messages = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
            window_size) | "FlatMap" >> beam.FlatMap(
            lambda elements: elements) | "Write to BQ" >> beam.io.gcp.bigquery.WriteToBigQuery(table=output_table_bq,
                                                                                               schema=(
                                                                                                   output_table_bq_schema),
                                                                                               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                                               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

完整的工作代码在这里:https://pastebin.com/WFwBvPcU