如何使用 Airflow 的 BigQuery 运算符引用外部 SQL 文件?
How can I reference an external SQL file using Airflow's BigQuery operator?
我目前正在使用 Airflow 和 BigQuery 运算符来触发各种 SQL 脚本。当直接在 Airflow DAG 文件中写入 SQL 时,这可以正常工作。例如:
bigquery_transform = BigQueryOperator(
task_id='bq-transform',
bql='SELECT * FROM `example.table`',
destination_dataset_table='example.destination'
)
但是,我想将 SQL 存储在保存到存储桶的单独文件中。例如:
bql='gs://example_bucket/sample_script.sql'
调用此外部文件时收到 "Template Not Found" 错误。
我看过一些示例将 SQL 文件加载到 Airflow DAG 文件夹中,但是,我真的很想访问保存到单独存储桶中的文件。这可能吗?
您可以引用 Google 云存储桶中的任何 SQL 文件。下面是一个示例,我在 airflow dag 存储桶的 sql 目录中调用文件 Query_File.sql。
CONNECTION_ID = 'project_name'
with DAG('dag', schedule_interval='0 9 * * *', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
battery_data_quality = BigQueryOperator(
task_id='task-id',
sql='/SQL/Query_File.sql',
destination_dataset_table='project-name.DataSetName.TableName${{ds_nodash}}',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
dag=dag
)
您还可以考虑使用 gcs_to_gcs operator 将内容从您想要的存储桶复制到 composer 可以访问的存储桶中。
在 Airflow 版本 1.10.3 和 1.10.15 中,GoogleCloudStorageDownloadOperator 的下载方式不同。
def execute(self, context):
self.object = context['dag_run'].conf['job_name'] + '.sql'
logging.info('filemname in GoogleCloudStorageDownloadOperator: %s', self.object)
self.filename = context['dag_run'].conf['job_name'] + '.sql'
self.log.info('Executing download: %s, %s, %s', self.bucket,
self.object, self.filename)
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to
)
file_bytes = hook.download(bucket=self.bucket,
object=self.object)
if self.store_to_xcom_key:
if sys.getsizeof(file_bytes) < 49344:
context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes.decode('utf-8'))
else:
raise RuntimeError(
'The size of the downloaded file is too large to push to XCom!'
)
我目前正在使用 Airflow 和 BigQuery 运算符来触发各种 SQL 脚本。当直接在 Airflow DAG 文件中写入 SQL 时,这可以正常工作。例如:
bigquery_transform = BigQueryOperator(
task_id='bq-transform',
bql='SELECT * FROM `example.table`',
destination_dataset_table='example.destination'
)
但是,我想将 SQL 存储在保存到存储桶的单独文件中。例如:
bql='gs://example_bucket/sample_script.sql'
调用此外部文件时收到 "Template Not Found" 错误。
我看过一些示例将 SQL 文件加载到 Airflow DAG 文件夹中,但是,我真的很想访问保存到单独存储桶中的文件。这可能吗?
您可以引用 Google 云存储桶中的任何 SQL 文件。下面是一个示例,我在 airflow dag 存储桶的 sql 目录中调用文件 Query_File.sql。
CONNECTION_ID = 'project_name'
with DAG('dag', schedule_interval='0 9 * * *', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
battery_data_quality = BigQueryOperator(
task_id='task-id',
sql='/SQL/Query_File.sql',
destination_dataset_table='project-name.DataSetName.TableName${{ds_nodash}}',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
dag=dag
)
您还可以考虑使用 gcs_to_gcs operator 将内容从您想要的存储桶复制到 composer 可以访问的存储桶中。
在 Airflow 版本 1.10.3 和 1.10.15 中,GoogleCloudStorageDownloadOperator 的下载方式不同。
def execute(self, context):
self.object = context['dag_run'].conf['job_name'] + '.sql'
logging.info('filemname in GoogleCloudStorageDownloadOperator: %s', self.object)
self.filename = context['dag_run'].conf['job_name'] + '.sql'
self.log.info('Executing download: %s, %s, %s', self.bucket,
self.object, self.filename)
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to
)
file_bytes = hook.download(bucket=self.bucket,
object=self.object)
if self.store_to_xcom_key:
if sys.getsizeof(file_bytes) < 49344:
context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes.decode('utf-8'))
else:
raise RuntimeError(
'The size of the downloaded file is too large to push to XCom!'
)