如何使用气流将 bigquery 导出到 bigtable?架构问题
how to export bigquery to bigtable using airflow? schema issue
我正在使用 Airflow 以 Avro 格式将 BigQuery 行提取到 Google 云存储。
with models.DAG(
"bigquery_to_bigtable",
default_args=default_args,
schedule_interval=None,
start_date=datetime.now(),
catchup=False,
tags=["test"],
) as dag:
data_to_gcs = BigQueryInsertJobOperator(
task_id="data_to_gcs",
project_id=project_id,
location=location,
configuration={
"extract": {
"destinationUri": gcs_uri, "destinationFormat": "AVRO",
"sourceTable": {
"projectId": project_id, "datasetId": dataset_id,
"tableId": table_id}}})
gcs_to_bt = DataflowTemplatedJobStartOperator(
task_id="gcs_to_bt",
template="gs://dataflow-templates/latest/GCS_Avro_to_Cloud_Bigtable",
location=location,
parameters={
'bigtableProjectId': project_id,
'bigtableInstanceId': bt_instance_id,
'bigtableTableId': bt_table_id,
'inputFilePattern': 'gs://export/test.avro-*'
},
)
data_to_gcs >> gcs_to_bt
bigquery 行包含
row_key | 1_cnt | 2_cnt | 3_cnt
1#2021-08-03 | 1 | 2 | 2
2#2021-08-02 | 5 | 1 | 5
.
.
.
我想对 bigtable 中的行键使用 row_key
列,对特定列族中的列使用 rest 列,例如 bigtable 中的 my_cf
。
但是我在使用数据流将 avro 文件加载到 bigtable 时收到错误消息
"java.io.IOException: Failed to start reading from source: gs://export/test.avro-"
Caused by: org.apache.avro.AvroTypeException: Found Root, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key
我读到的docs是这样说的:
The Bigtable table must exist and have the same column families as
exported in the Avro files.
如何在 Avro 中导出具有相同列族的 BigQuery?
我认为您必须将 AVRO 转换为正确的模式。 Documentation 你提到的还说:
- Bigtable expects a specific schema from the input Avro files.
有一个 link 引用特殊数据模式,必须使用它。
如果我理解正确的话,您只是从 table 中导入数据,结果虽然是 AVRO 模式,但不会满足需求模式,因此您需要将数据转换为适合您的 BigTable 模式的正确模式。
我正在使用 Airflow 以 Avro 格式将 BigQuery 行提取到 Google 云存储。
with models.DAG(
"bigquery_to_bigtable",
default_args=default_args,
schedule_interval=None,
start_date=datetime.now(),
catchup=False,
tags=["test"],
) as dag:
data_to_gcs = BigQueryInsertJobOperator(
task_id="data_to_gcs",
project_id=project_id,
location=location,
configuration={
"extract": {
"destinationUri": gcs_uri, "destinationFormat": "AVRO",
"sourceTable": {
"projectId": project_id, "datasetId": dataset_id,
"tableId": table_id}}})
gcs_to_bt = DataflowTemplatedJobStartOperator(
task_id="gcs_to_bt",
template="gs://dataflow-templates/latest/GCS_Avro_to_Cloud_Bigtable",
location=location,
parameters={
'bigtableProjectId': project_id,
'bigtableInstanceId': bt_instance_id,
'bigtableTableId': bt_table_id,
'inputFilePattern': 'gs://export/test.avro-*'
},
)
data_to_gcs >> gcs_to_bt
bigquery 行包含
row_key | 1_cnt | 2_cnt | 3_cnt
1#2021-08-03 | 1 | 2 | 2
2#2021-08-02 | 5 | 1 | 5
.
.
.
我想对 bigtable 中的行键使用 row_key
列,对特定列族中的列使用 rest 列,例如 bigtable 中的 my_cf
。
但是我在使用数据流将 avro 文件加载到 bigtable 时收到错误消息
"java.io.IOException: Failed to start reading from source: gs://export/test.avro-"
Caused by: org.apache.avro.AvroTypeException: Found Root, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key
我读到的docs是这样说的:
The Bigtable table must exist and have the same column families as exported in the Avro files.
如何在 Avro 中导出具有相同列族的 BigQuery?
我认为您必须将 AVRO 转换为正确的模式。 Documentation 你提到的还说:
- Bigtable expects a specific schema from the input Avro files.
有一个 link 引用特殊数据模式,必须使用它。
如果我理解正确的话,您只是从 table 中导入数据,结果虽然是 AVRO 模式,但不会满足需求模式,因此您需要将数据转换为适合您的 BigTable 模式的正确模式。