GCP 作曲家从存储桶中读取 sql 并将其推送到 bigquery table
GCP composer read sql from bucket and push it to bigquery table
我们使用 GCP composer(Airflow 管理)作为管弦乐工具,使用 BigQuery 作为数据库。我需要将数据从另一个 table(两个 table 都位于 bigquery db 中)推送到 table,但该方法应该是 upsert。所以我写了一个 sql 脚本,使用 marge 来更新或插入。
我有两个问题:
- 位于 GCP Composer 存储桶中的 marge 脚本,如何从存储桶中读取 sql 脚本?
- 读取 sql 文件后,如何 运行 在 bigquery 上查询?
谢谢
您可以使用下面的脚本在 GCS 中读取文件。我使用执行 INSERT 并保存在我的 Composer 存储桶中的 SQL 脚本对此进行了测试。
在read_gcs_op
中它将执行read_gcs_file()
和returnsql脚本的内容。 sql 脚本的内容将被 execute_query
使用并执行脚本中的查询。请参阅下面的代码:
import datetime
from airflow import models
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators import python
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from google.cloud import bigquery
import logging
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
BUCKET_NAME = 'your-composer-bucket'
GCS_FILES = ['sql_query.txt']
PREFIX = 'data' # populate this if you stored your sql script in a directory in the bucket
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'query_gcs_to_bq',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
def read_gcs_file(**kwargs):
hook = GCSHook()
for gcs_file in GCS_FILES:
#check if PREFIX is available and initialize the gcs file to be copied
if PREFIX:
object_name = f'{PREFIX}/{gcs_file}'
else:
object_name = f'{gcs_file}'
#perform gcs hook download
resp_byte = hook.download_as_byte_array(
bucket_name = BUCKET_NAME,
object_name = object_name,
)
resp_string = resp_byte.decode("utf-8")
logging.info(resp_string)
return resp_string
read_gcs_op = python.PythonOperator(
task_id='read_gcs',
provide_context=True,
python_callable=read_gcs_file,
)
sql_query = "{{ task_instance.xcom_pull(task_ids='read_gcs') }}" # store returned value from read_gcs_op
def query_bq(sql):
hook = BigQueryHook(bigquery_conn_id="bigquery_default", delegate_to=None, use_legacy_sql=False)
client = bigquery.Client(project=hook._get_field("project"), credentials=hook._get_credentials())
client.query(sql) # If you are not doing DML, you assign this to a variable and return the value
execute_query = python.PythonOperator(
task_id='query_bq',
provide_context=True,
python_callable=query_bq,
op_kwargs = {
"sql": sql_query
}
)
read_gcs_op >> execute_query
为了测试,我使用了 INSERT 语句作为上面脚本使用的 SQL 脚本:
sql_script.txt
INSERT `your-project.dataset.your_table` (name, age)
VALUES('Brady', 44)
测试完成:
Return 任务值 read_gcs
:
在 Composer 完成执行 read_gcs
和 query_bq
后,我检查了我的 table 插入语句是否成功。:
我们使用 GCP composer(Airflow 管理)作为管弦乐工具,使用 BigQuery 作为数据库。我需要将数据从另一个 table(两个 table 都位于 bigquery db 中)推送到 table,但该方法应该是 upsert。所以我写了一个 sql 脚本,使用 marge 来更新或插入。
我有两个问题:
- 位于 GCP Composer 存储桶中的 marge 脚本,如何从存储桶中读取 sql 脚本?
- 读取 sql 文件后,如何 运行 在 bigquery 上查询?
谢谢
您可以使用下面的脚本在 GCS 中读取文件。我使用执行 INSERT 并保存在我的 Composer 存储桶中的 SQL 脚本对此进行了测试。
在read_gcs_op
中它将执行read_gcs_file()
和returnsql脚本的内容。 sql 脚本的内容将被 execute_query
使用并执行脚本中的查询。请参阅下面的代码:
import datetime
from airflow import models
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators import python
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from google.cloud import bigquery
import logging
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
BUCKET_NAME = 'your-composer-bucket'
GCS_FILES = ['sql_query.txt']
PREFIX = 'data' # populate this if you stored your sql script in a directory in the bucket
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'query_gcs_to_bq',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
def read_gcs_file(**kwargs):
hook = GCSHook()
for gcs_file in GCS_FILES:
#check if PREFIX is available and initialize the gcs file to be copied
if PREFIX:
object_name = f'{PREFIX}/{gcs_file}'
else:
object_name = f'{gcs_file}'
#perform gcs hook download
resp_byte = hook.download_as_byte_array(
bucket_name = BUCKET_NAME,
object_name = object_name,
)
resp_string = resp_byte.decode("utf-8")
logging.info(resp_string)
return resp_string
read_gcs_op = python.PythonOperator(
task_id='read_gcs',
provide_context=True,
python_callable=read_gcs_file,
)
sql_query = "{{ task_instance.xcom_pull(task_ids='read_gcs') }}" # store returned value from read_gcs_op
def query_bq(sql):
hook = BigQueryHook(bigquery_conn_id="bigquery_default", delegate_to=None, use_legacy_sql=False)
client = bigquery.Client(project=hook._get_field("project"), credentials=hook._get_credentials())
client.query(sql) # If you are not doing DML, you assign this to a variable and return the value
execute_query = python.PythonOperator(
task_id='query_bq',
provide_context=True,
python_callable=query_bq,
op_kwargs = {
"sql": sql_query
}
)
read_gcs_op >> execute_query
为了测试,我使用了 INSERT 语句作为上面脚本使用的 SQL 脚本:
sql_script.txt
INSERT `your-project.dataset.your_table` (name, age)
VALUES('Brady', 44)
测试完成:
Return 任务值 read_gcs
:
在 Composer 完成执行 read_gcs
和 query_bq
后,我检查了我的 table 插入语句是否成功。: