将 Bigquery 结果保存到 Google Composer 中的 JSON
Save Bigquery results to JSON in Google Composer
我每天在 DAG 下为 运行 创建一个 sql 脚本。如何将查询结果保存到 JSON 文件并保存在 Google Composer 的 DAG 文件夹中?
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator
START_DATE = datetime.datetime(2020, 3, 1)
default_args = {
'owner': 'Alen',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=15),
'start_date': START_DATE,
}
with airflow.DAG(
'Dag_Name',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
task_name = bigquery_operator.BigQueryOperator(
task_id='task_name',
sql= 'query.sql',
use_legacy_sql=False,
write_disposition= 'WRITE_TRUNCATE' ,
destination_dataset_table='Project.Dataset.destination_table')
一种替代方法是 运行 从 BQ 导出到 GCS,并将 DAG 文件夹作为目标。
您可以使用 bash 或 bq 运算符
然后 运行 在你的脚本末尾写这样的东西:
copy_files_to_DAG_folder = bash_operator.BashOperator(
task_id='Copy_files_to_GCS',
bash_command='bq extract --destination_format JSON--print_header=false 'BQ_TABLE'
'GCS_DAG_FOLDER_LOCATION''
来自文档:
bq --location=location extract \
--destination_format format \
--compression compression_type \
--field_delimiter delimiter \
--print_header=boolean \
project_id:dataset.table \
gs://bucket/filename.ext
我每天在 DAG 下为 运行 创建一个 sql 脚本。如何将查询结果保存到 JSON 文件并保存在 Google Composer 的 DAG 文件夹中?
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator
START_DATE = datetime.datetime(2020, 3, 1)
default_args = {
'owner': 'Alen',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=15),
'start_date': START_DATE,
}
with airflow.DAG(
'Dag_Name',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
task_name = bigquery_operator.BigQueryOperator(
task_id='task_name',
sql= 'query.sql',
use_legacy_sql=False,
write_disposition= 'WRITE_TRUNCATE' ,
destination_dataset_table='Project.Dataset.destination_table')
一种替代方法是 运行 从 BQ 导出到 GCS,并将 DAG 文件夹作为目标。
您可以使用 bash 或 bq 运算符
然后 运行 在你的脚本末尾写这样的东西:
copy_files_to_DAG_folder = bash_operator.BashOperator(
task_id='Copy_files_to_GCS',
bash_command='bq extract --destination_format JSON--print_header=false 'BQ_TABLE'
'GCS_DAG_FOLDER_LOCATION''
来自文档:
bq --location=location extract \
--destination_format format \
--compression compression_type \
--field_delimiter delimiter \
--print_header=boolean \
project_id:dataset.table \
gs://bucket/filename.ext