WriteToBigQuery 动态 table 目的地 returns 错误 tableId

WriteToBigQuery Dynamic table destinations returns wrong tableId

我正在尝试将 bigquery 写入不同的 table 目的地,如果 table 尚不存在,我想动态创建它们。

bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(lambda e: compute_table_name(e),
                                                schema=compute_table_schema,
                                                additional_bq_parameters=additional_bq_parameters,
                                                write_disposition=BigQueryDisposition.WRITE_APPEND,
                                                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                                )

函数compute_table_name其实很简单,我只是想让它发挥作用。

def compute_table_name(element):
    if element['table'] == 'table_id':
        del element['table']
        return "project_id:dataset.table_id"

模式已被正确检测到,table 已创建并填充了记录。问题是,我得到的 table ID 类似于:

datasetId: 'dataset'
projectId: 'project_id'
tableId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP...

我也曾尝试在我的 compute_table_name 函数中返回一个 bigquery.TableReference 对象,但没有成功。

编辑:我正在使用 apache-beam 2.34.0 并且我在 JIRA here

上打开了一个问题

你的流水线代码没问题。但是,您可以将可调用对象传递给 compute_table 名称函数:

bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(compute_table_name,
                                            schema=compute_table_schema,
                                            additional_bq_parameters=additional_bq_parameters,
                                            write_disposition=BigQueryDisposition.WRITE_APPEND,
                                            create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                            )

BigQuery 中的 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP' table 名称可能意味着加载作业尚未完成,或者它有错误;您应该检查 BigQuery UI 中的“个人历史”或“项目历史”选项卡,以查看工作状态。

我按照 找到了答案的解决方案。这感觉像是一种解决方法,因为我没有将可调用对象传递给 WriteToBigQuery()。测试了很多方法,我发现直接给它工作的方法提供 string/TableReference,但不给它一个可调用的。

我每 15 分钟处理约 50 GB 的数据,这些数据分布在 6 个表中并且运行良好。