WriteToBigQuery 动态 table 目的地 returns 错误 tableId
WriteToBigQuery Dynamic table destinations returns wrong tableId
我正在尝试将 bigquery 写入不同的 table 目的地,如果 table 尚不存在,我想动态创建它们。
bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(lambda e: compute_table_name(e),
schema=compute_table_schema,
additional_bq_parameters=additional_bq_parameters,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
)
函数compute_table_name其实很简单,我只是想让它发挥作用。
def compute_table_name(element):
if element['table'] == 'table_id':
del element['table']
return "project_id:dataset.table_id"
模式已被正确检测到,table 已创建并填充了记录。问题是,我得到的 table ID 类似于:
datasetId: 'dataset'
projectId: 'project_id'
tableId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP...
我也曾尝试在我的 compute_table_name 函数中返回一个 bigquery.TableReference 对象,但没有成功。
编辑:我正在使用 apache-beam 2.34.0 并且我在 JIRA here
上打开了一个问题
你的流水线代码没问题。但是,您可以将可调用对象传递给 compute_table 名称函数:
bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(compute_table_name,
schema=compute_table_schema,
additional_bq_parameters=additional_bq_parameters,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
)
BigQuery 中的 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP'
table 名称可能意味着加载作业尚未完成,或者它有错误;您应该检查 BigQuery UI 中的“个人历史”或“项目历史”选项卡,以查看工作状态。
我按照 找到了答案的解决方案。这感觉像是一种解决方法,因为我没有将可调用对象传递给 WriteToBigQuery()。测试了很多方法,我发现直接给它工作的方法提供 string/TableReference,但不给它一个可调用的。
我每 15 分钟处理约 50 GB 的数据,这些数据分布在 6 个表中并且运行良好。
我正在尝试将 bigquery 写入不同的 table 目的地,如果 table 尚不存在,我想动态创建它们。
bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(lambda e: compute_table_name(e),
schema=compute_table_schema,
additional_bq_parameters=additional_bq_parameters,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
)
函数compute_table_name其实很简单,我只是想让它发挥作用。
def compute_table_name(element):
if element['table'] == 'table_id':
del element['table']
return "project_id:dataset.table_id"
模式已被正确检测到,table 已创建并填充了记录。问题是,我得到的 table ID 类似于:
datasetId: 'dataset'
projectId: 'project_id'
tableId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP...
我也曾尝试在我的 compute_table_name 函数中返回一个 bigquery.TableReference 对象,但没有成功。
编辑:我正在使用 apache-beam 2.34.0 并且我在 JIRA here
上打开了一个问题你的流水线代码没问题。但是,您可以将可调用对象传递给 compute_table 名称函数:
bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(compute_table_name,
schema=compute_table_schema,
additional_bq_parameters=additional_bq_parameters,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
)
BigQuery 中的 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP'
table 名称可能意味着加载作业尚未完成,或者它有错误;您应该检查 BigQuery UI 中的“个人历史”或“项目历史”选项卡,以查看工作状态。
我按照
我每 15 分钟处理约 50 GB 的数据,这些数据分布在 6 个表中并且运行良好。