如何将 Google Cloud Composer (Airflow) 与 Google Sheets 连接并将信息提取到 Google 存储(存储桶)

How to connect Google Cloud Composer (Airflow) with Google Sheets and extract the info to Google Storage (bucket)

我正在尝试阅读 google sheet 例如:

STORE_NUM | STORE_NAME | OPENING_AT | CLOSING_AT
01 | name_a | 9:00 | 20:00
02 | name_b | 9:00 | 20:00
03 | name_c | 9:00 | 20:00
04 | name_d | 9:00 | 20:00

我只想从中读取对应于 A3 和 D5 的行(STORE_NUM 是第一行,即单元格 A1),一旦读取,我想用它制作一个文件(csv,avro , 等),然后将其放入 GCS 存储桶中,然后我将读取存储桶中的文件并将其“下沉”到 BigQuery。

我需要使用 Composer (Airflow) 这样做,到目前为止我使用的是 不工作 脚本,例如:

import datetime
import airflow
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator

BUCKET = "name_of_the_bucket"
SPREADSHEET_ID = "sheet_id" # <- How to add the sheet's name and range? e.g. ShName!A5:C10

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.today(),
    #'end_date': ,
    'email': ['some_person_name@some_company_name.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    #'retries': 1,
    'retry_delay': datetime.timedelta(minutes=1)
    }   

sch_interval = "00 10 * * 1"

dag = DAG(
    'GSh_to_GCS_lab',
    default_args=default_args,
    tags=["example"],
    catchup=False,
    schedule_interval=sch_interval)

upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
    task_id="upload_sheet_to_gcs",
    destination_bucket=BUCKET,
    spreadsheet_id=SPREADSHEET_ID,
    gcp_conn_id='google_cloud_default',
    dag=dag
)

start = DummyOperator(task_id='Starting', dag=dag)

start >> upload_sheet_to_gcs

到目前为止,我在身份验证方面遇到了问题,但我可能还有一些我无法识别的其他错误。

我正在与:

我已经阅读了一些文档,但老实说,我无法将所有信息整合在一起以使其正确。

我对 Composer 和 Airflow 都很陌生,所以请尽可能详细。我怎样才能完成我需要的事情?至少第一阶段(从 GSh 读取信息并将信息作为文件加载到存储桶中)

任何能让我更接近的有用信息都将不胜感激。

提前致谢。

我能够使用此脚本重现将 Google sheet 加载到存储桶的部分:

import datetime

from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
BUCKET = 'your-target-bucket'
SHEET_ID = '<google_sheet_id>'

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'drive_to_gcs',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    #print_dag_run_conf = bash_operator.BashOperator(
        #task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
    upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
        task_id="upload_sheet_to_gcs",
        destination_bucket=BUCKET,
        spreadsheet_id=SHEET_ID,
        gcp_conn_id='google_cloud_default'
    )

    upload_sheet_to_gcs

只需确保在您的 google sheet:

中将创建 Composer 环境时使用的服务帐户添加为编辑器

启用Google Sheets API。 根据此 documentation 创建环境后在 API 范围内添加 https://www.googleapis.com/auth/drive 并使用版本 Airflow 2 作为图像版本:

输出:

*** Reading remote log from gs://us-central1-case-20220331-fde8f6be-bucket/logs/drive_to_gcs/upload_sheet_to_gcs/2022-03-31T06:37:38.140371+00:00/1.log.
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: drive_to_gcs.upload_sheet_to_gcs manual__2022-03-31T06:37:38.140371+00:00 [queued]>
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: drive_to_gcs.upload_sheet_to_gcs manual__2022-03-31T06:37:38.140371+00:00 [queued]>
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1239} INFO - 
--------------------------------------------------------------------------------
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1240} INFO - Starting attempt 1 of 2
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2022-03-31, 06:37:45 UTC] {taskinstance.py:1260} INFO - Executing <Task(GoogleSheetsToGCSOperator): upload_sheet_to_gcs> on 2022-03-31 06:37:38.140371+00:00
[2022-03-31, 06:37:45 UTC] {standard_task_runner.py:52} INFO - Started process 2572 to run task
[2022-03-31, 06:37:45 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'drive_to_gcs', 'upload_sheet_to_gcs', 'manual__2022-03-31T06:37:38.140371+00:00', '--job-id', '33', '--raw', '--subdir', 'DAGS_FOLDER/20220331.py', '--cfg-path', '/tmp/tmpia0bpqnu', '--error-file', '/tmp/tmpjioqxsfk']
[2022-03-31, 06:37:45 UTC] {standard_task_runner.py:77} INFO - Job 33: Subtask upload_sheet_to_gcs
[2022-03-31, 06:37:46 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: drive_to_gcs.upload_sheet_to_gcs manual__2022-03-31T06:37:38.140371+00:00 [running]> on host airflow-worker-7b5f8fc749-nxcfg
[2022-03-31, 06:37:46 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=
AIRFLOW_CTX_DAG_OWNER=Composer Example
AIRFLOW_CTX_DAG_ID=drive_to_gcs
AIRFLOW_CTX_TASK_ID=upload_sheet_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2022-03-31T06:37:38.140371+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-03-31T06:37:38.140371+00:00
[2022-03-31, 06:37:47 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-03-31, 06:37:48 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-03-31, 06:37:48 UTC] {gcs.py:518} INFO - File /tmp/tmp6amutoi9 uploaded to 20220331_Sheet1.csv in case_20200301 bucket
[2022-03-31, 06:37:48 UTC] {taskinstance.py:1268} INFO - Marking task as SUCCESS. dag_id=drive_to_gcs, task_id=upload_sheet_to_gcs, execution_date=20220331T063738, start_date=20220331T063745, end_date=20220331T063748
[2022-03-31, 06:37:48 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-03-31, 06:37:48 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

已上传至存储桶: