如何将 Google Cloud Composer (Airflow) 与 Google Sheets 连接并将信息提取到 Google 存储(存储桶)
How to connect Google Cloud Composer (Airflow) with Google Sheets and extract the info to Google Storage (bucket)
我正在尝试阅读 google sheet 例如:
STORE_NUM | STORE_NAME | OPENING_AT | CLOSING_AT
01 | name_a | 9:00 | 20:00
02 | name_b | 9:00 | 20:00
03 | name_c | 9:00 | 20:00
04 | name_d | 9:00 | 20:00
我只想从中读取对应于 A3 和 D5 的行(STORE_NUM 是第一行,即单元格 A1),一旦读取,我想用它制作一个文件(csv,avro , 等),然后将其放入 GCS 存储桶中,然后我将读取存储桶中的文件并将其“下沉”到 BigQuery。
我需要使用 Composer (Airflow) 这样做,到目前为止我使用的是 不工作 脚本,例如:
import datetime
import airflow
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator
BUCKET = "name_of_the_bucket"
SPREADSHEET_ID = "sheet_id" # <- How to add the sheet's name and range? e.g. ShName!A5:C10
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.today(),
#'end_date': ,
'email': ['some_person_name@some_company_name.com'],
'email_on_failure': True,
'email_on_retry': True,
#'retries': 1,
'retry_delay': datetime.timedelta(minutes=1)
}
sch_interval = "00 10 * * 1"
dag = DAG(
'GSh_to_GCS_lab',
default_args=default_args,
tags=["example"],
catchup=False,
schedule_interval=sch_interval)
upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
task_id="upload_sheet_to_gcs",
destination_bucket=BUCKET,
spreadsheet_id=SPREADSHEET_ID,
gcp_conn_id='google_cloud_default',
dag=dag
)
start = DummyOperator(task_id='Starting', dag=dag)
start >> upload_sheet_to_gcs
到目前为止,我在身份验证方面遇到了问题,但我可能还有一些我无法识别的其他错误。
我正在与:
我已经阅读了一些文档,但老实说,我无法将所有信息整合在一起以使其正确。
我对 Composer 和 Airflow 都很陌生,所以请尽可能详细。我怎样才能完成我需要的事情?至少第一阶段(从 GSh 读取信息并将信息作为文件加载到存储桶中)
任何能让我更接近的有用信息都将不胜感激。
提前致谢。
我能够使用此脚本重现将 Google sheet 加载到存储桶的部分:
import datetime
from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
BUCKET = 'your-target-bucket'
SHEET_ID = '<google_sheet_id>'
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'drive_to_gcs',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
# Print the dag_run id from the Airflow logs
#print_dag_run_conf = bash_operator.BashOperator(
#task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
task_id="upload_sheet_to_gcs",
destination_bucket=BUCKET,
spreadsheet_id=SHEET_ID,
gcp_conn_id='google_cloud_default'
)
upload_sheet_to_gcs
只需确保在您的 google sheet:
中将创建 Composer 环境时使用的服务帐户添加为编辑器
启用Google Sheets API。
根据此 documentation 创建环境后在 API 范围内添加 https://www.googleapis.com/auth/drive
并使用版本 Airflow 2 作为图像版本:
输出:
*** Reading remote log from gs://us-central1-case-20220331-fde8f6be-bucket/logs/drive_to_gcs/upload_sheet_to_gcs/2022-03-31T06:37:38.140371+00:00/1.log.
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: drive_to_gcs.upload_sheet_to_gcs manual__2022-03-31T06:37:38.140371+00:00 [queued]>
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: drive_to_gcs.upload_sheet_to_gcs manual__2022-03-31T06:37:38.140371+00:00 [queued]>
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1239} INFO -
--------------------------------------------------------------------------------
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1240} INFO - Starting attempt 1 of 2
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1241} INFO -
--------------------------------------------------------------------------------
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1260} INFO - Executing <Task(GoogleSheetsToGCSOperator): upload_sheet_to_gcs> on 2022-03-31 06:37:38.140371+00:00
[2022-03-31, 06:37:45 UTC] {standard_task_runner.py:52} INFO - Started process 2572 to run task
[2022-03-31, 06:37:45 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'drive_to_gcs', 'upload_sheet_to_gcs', 'manual__2022-03-31T06:37:38.140371+00:00', '--job-id', '33', '--raw', '--subdir', 'DAGS_FOLDER/20220331.py', '--cfg-path', '/tmp/tmpia0bpqnu', '--error-file', '/tmp/tmpjioqxsfk']
[2022-03-31, 06:37:45 UTC] {standard_task_runner.py:77} INFO - Job 33: Subtask upload_sheet_to_gcs
[2022-03-31, 06:37:46 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: drive_to_gcs.upload_sheet_to_gcs manual__2022-03-31T06:37:38.140371+00:00 [running]> on host airflow-worker-7b5f8fc749-nxcfg
[2022-03-31, 06:37:46 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=
AIRFLOW_CTX_DAG_OWNER=Composer Example
AIRFLOW_CTX_DAG_ID=drive_to_gcs
AIRFLOW_CTX_TASK_ID=upload_sheet_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2022-03-31T06:37:38.140371+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-03-31T06:37:38.140371+00:00
[2022-03-31, 06:37:47 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-03-31, 06:37:48 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-03-31, 06:37:48 UTC] {gcs.py:518} INFO - File /tmp/tmp6amutoi9 uploaded to 20220331_Sheet1.csv in case_20200301 bucket
[2022-03-31, 06:37:48 UTC] {taskinstance.py:1268} INFO - Marking task as SUCCESS. dag_id=drive_to_gcs, task_id=upload_sheet_to_gcs, execution_date=20220331T063738, start_date=20220331T063745, end_date=20220331T063748
[2022-03-31, 06:37:48 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-03-31, 06:37:48 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
已上传至存储桶:
我正在尝试阅读 google sheet 例如:
STORE_NUM | STORE_NAME | OPENING_AT | CLOSING_AT
01 | name_a | 9:00 | 20:00
02 | name_b | 9:00 | 20:00
03 | name_c | 9:00 | 20:00
04 | name_d | 9:00 | 20:00
我只想从中读取对应于 A3 和 D5 的行(STORE_NUM 是第一行,即单元格 A1),一旦读取,我想用它制作一个文件(csv,avro , 等),然后将其放入 GCS 存储桶中,然后我将读取存储桶中的文件并将其“下沉”到 BigQuery。
我需要使用 Composer (Airflow) 这样做,到目前为止我使用的是 不工作 脚本,例如:
import datetime
import airflow
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator
BUCKET = "name_of_the_bucket"
SPREADSHEET_ID = "sheet_id" # <- How to add the sheet's name and range? e.g. ShName!A5:C10
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.today(),
#'end_date': ,
'email': ['some_person_name@some_company_name.com'],
'email_on_failure': True,
'email_on_retry': True,
#'retries': 1,
'retry_delay': datetime.timedelta(minutes=1)
}
sch_interval = "00 10 * * 1"
dag = DAG(
'GSh_to_GCS_lab',
default_args=default_args,
tags=["example"],
catchup=False,
schedule_interval=sch_interval)
upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
task_id="upload_sheet_to_gcs",
destination_bucket=BUCKET,
spreadsheet_id=SPREADSHEET_ID,
gcp_conn_id='google_cloud_default',
dag=dag
)
start = DummyOperator(task_id='Starting', dag=dag)
start >> upload_sheet_to_gcs
到目前为止,我在身份验证方面遇到了问题,但我可能还有一些我无法识别的其他错误。
我正在与:
我已经阅读了一些文档,但老实说,我无法将所有信息整合在一起以使其正确。
我对 Composer 和 Airflow 都很陌生,所以请尽可能详细。我怎样才能完成我需要的事情?至少第一阶段(从 GSh 读取信息并将信息作为文件加载到存储桶中)
任何能让我更接近的有用信息都将不胜感激。
提前致谢。
我能够使用此脚本重现将 Google sheet 加载到存储桶的部分:
import datetime
from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
BUCKET = 'your-target-bucket'
SHEET_ID = '<google_sheet_id>'
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'drive_to_gcs',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
# Print the dag_run id from the Airflow logs
#print_dag_run_conf = bash_operator.BashOperator(
#task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
task_id="upload_sheet_to_gcs",
destination_bucket=BUCKET,
spreadsheet_id=SHEET_ID,
gcp_conn_id='google_cloud_default'
)
upload_sheet_to_gcs
只需确保在您的 google sheet:
中将创建 Composer 环境时使用的服务帐户添加为编辑器启用Google Sheets API。
根据此 documentation 创建环境后在 API 范围内添加 https://www.googleapis.com/auth/drive
并使用版本 Airflow 2 作为图像版本:
输出:
*** Reading remote log from gs://us-central1-case-20220331-fde8f6be-bucket/logs/drive_to_gcs/upload_sheet_to_gcs/2022-03-31T06:37:38.140371+00:00/1.log.
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: drive_to_gcs.upload_sheet_to_gcs manual__2022-03-31T06:37:38.140371+00:00 [queued]>
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: drive_to_gcs.upload_sheet_to_gcs manual__2022-03-31T06:37:38.140371+00:00 [queued]>
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1239} INFO -
--------------------------------------------------------------------------------
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1240} INFO - Starting attempt 1 of 2
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1241} INFO -
--------------------------------------------------------------------------------
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1260} INFO - Executing <Task(GoogleSheetsToGCSOperator): upload_sheet_to_gcs> on 2022-03-31 06:37:38.140371+00:00
[2022-03-31, 06:37:45 UTC] {standard_task_runner.py:52} INFO - Started process 2572 to run task
[2022-03-31, 06:37:45 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'drive_to_gcs', 'upload_sheet_to_gcs', 'manual__2022-03-31T06:37:38.140371+00:00', '--job-id', '33', '--raw', '--subdir', 'DAGS_FOLDER/20220331.py', '--cfg-path', '/tmp/tmpia0bpqnu', '--error-file', '/tmp/tmpjioqxsfk']
[2022-03-31, 06:37:45 UTC] {standard_task_runner.py:77} INFO - Job 33: Subtask upload_sheet_to_gcs
[2022-03-31, 06:37:46 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: drive_to_gcs.upload_sheet_to_gcs manual__2022-03-31T06:37:38.140371+00:00 [running]> on host airflow-worker-7b5f8fc749-nxcfg
[2022-03-31, 06:37:46 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=
AIRFLOW_CTX_DAG_OWNER=Composer Example
AIRFLOW_CTX_DAG_ID=drive_to_gcs
AIRFLOW_CTX_TASK_ID=upload_sheet_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2022-03-31T06:37:38.140371+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-03-31T06:37:38.140371+00:00
[2022-03-31, 06:37:47 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-03-31, 06:37:48 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-03-31, 06:37:48 UTC] {gcs.py:518} INFO - File /tmp/tmp6amutoi9 uploaded to 20220331_Sheet1.csv in case_20200301 bucket
[2022-03-31, 06:37:48 UTC] {taskinstance.py:1268} INFO - Marking task as SUCCESS. dag_id=drive_to_gcs, task_id=upload_sheet_to_gcs, execution_date=20220331T063738, start_date=20220331T063745, end_date=20220331T063748
[2022-03-31, 06:37:48 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-03-31, 06:37:48 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
已上传至存储桶: