Table GCS 到 BQ Airflow 任务到期
Table expiration in GCS to BQ Airflow task
我正在使用 Airflow 中的 GCSToBigQueryOperator
任务将 CSV 复制到新的 BQ table。有没有办法在此任务中向此 table 添加 table 到期时间?
new_table_task = GCSToBigQueryOperator(
task_id='insert_gcs_to_bq_tmp_table',
bucket=BUCKET,
source_objects=SOURCE_PATH,
destination_project_dataset_table=f"{BQ_PROJECT}.{DATASET}.{tmp_table_name}",
write_disposition='WRITE_TRUNCATE',
skip_leading_rows=1,
schema_object=SCHEMA_OBJECT
)
如果这不可能,我最好的选择是先创建 table 并使用 DDL 定义到期时间,然后再使用 GCSToBigQueryOperator
吗?谢谢!
执行 BigQueryCreateEmptyTableOperator,在 GCSToBigQueryOperator 之前使用 table expirationTime 可能会解决问题
Airflow BigQueryCreateEmptyTableOperator Document
BigQueryCreateEmptyTableOperator(
...
table_resource={
"tableReference": {"tableId": ""},
"expirationTime": ,
}
)
您可以使用 BigQueryHook.patch_bq_table(),在创建 table 后更新到期时间。它接受 expiration_time
作为参数。
注意:patch_bq_table() 仅更新传递的参数。
expiration_time (Optional[int]) -- [Optional] The time when this table
expires, in milliseconds since the epoch.
查看下面的代码:
import datetime
from airflow import models
from airflow.operators import python
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
DATASET_NAME = 'your-dataset'
TABLE_NAME = 'your-table'
PROJECT_ID = 'your-project'
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'create_table_add_expiration',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery',
bucket='bucket-name',
source_objects=['folder/file.csv'],
source_format='CSV',
skip_leading_rows=1,
destination_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}",
autodetect=True,
write_disposition='WRITE_TRUNCATE',
)
def patch_bq_table(**kwargs):
hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
hook.patch_table(
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
project_id=PROJECT_ID,
description="test",
expiration_time=1646884330000 # March 10, 2022 in epoch time (milliseconds)
)
update_table = python.PythonOperator(
task_id='add_expiration',
provide_context=True,
python_callable=patch_bq_table,
)
load_csv >> update_table
气流测试:
已在 BQ 中更新 table:
您也可以使用 BigQueryUpdateTableOperator。如果您指定 fields
它将修补而不是更新 table
set_bq_table_expiration = BigQueryUpdateTableOperator(
dag=dag,
task_id="set_bq_table_expiration",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
project_id=PROJECT_ID,
fields=["expirationTime"],
table_resource={
"expirationTime": "1646884330000",
},
)
我正在使用 Airflow 中的 GCSToBigQueryOperator
任务将 CSV 复制到新的 BQ table。有没有办法在此任务中向此 table 添加 table 到期时间?
new_table_task = GCSToBigQueryOperator(
task_id='insert_gcs_to_bq_tmp_table',
bucket=BUCKET,
source_objects=SOURCE_PATH,
destination_project_dataset_table=f"{BQ_PROJECT}.{DATASET}.{tmp_table_name}",
write_disposition='WRITE_TRUNCATE',
skip_leading_rows=1,
schema_object=SCHEMA_OBJECT
)
如果这不可能,我最好的选择是先创建 table 并使用 DDL 定义到期时间,然后再使用 GCSToBigQueryOperator
吗?谢谢!
执行 BigQueryCreateEmptyTableOperator,在 GCSToBigQueryOperator 之前使用 table expirationTime 可能会解决问题
Airflow BigQueryCreateEmptyTableOperator Document
BigQueryCreateEmptyTableOperator(
...
table_resource={
"tableReference": {"tableId": ""},
"expirationTime": ,
}
)
您可以使用 BigQueryHook.patch_bq_table(),在创建 table 后更新到期时间。它接受 expiration_time
作为参数。
注意:patch_bq_table() 仅更新传递的参数。
expiration_time (Optional[int]) -- [Optional] The time when this table expires, in milliseconds since the epoch.
查看下面的代码:
import datetime
from airflow import models
from airflow.operators import python
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
DATASET_NAME = 'your-dataset'
TABLE_NAME = 'your-table'
PROJECT_ID = 'your-project'
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'create_table_add_expiration',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery',
bucket='bucket-name',
source_objects=['folder/file.csv'],
source_format='CSV',
skip_leading_rows=1,
destination_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}",
autodetect=True,
write_disposition='WRITE_TRUNCATE',
)
def patch_bq_table(**kwargs):
hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
hook.patch_table(
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
project_id=PROJECT_ID,
description="test",
expiration_time=1646884330000 # March 10, 2022 in epoch time (milliseconds)
)
update_table = python.PythonOperator(
task_id='add_expiration',
provide_context=True,
python_callable=patch_bq_table,
)
load_csv >> update_table
气流测试:
已在 BQ 中更新 table:
您也可以使用 BigQueryUpdateTableOperator。如果您指定 fields
它将修补而不是更新 table
set_bq_table_expiration = BigQueryUpdateTableOperator(
dag=dag,
task_id="set_bq_table_expiration",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
project_id=PROJECT_ID,
fields=["expirationTime"],
table_resource={
"expirationTime": "1646884330000",
},
)