Airflow( composer ) 触发 DAG 后出错

Error after DAG is triggered in Airflow( composer )

当我的 DAG 被触发创建 dataproc 集群时,我无法理解问题所在。下面是日志

[2021-02-02 12:13:55,321] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: hourly_processing.create_dataproc_cluster 2021-02-02T12:13:47.063326+00:00 [queued]>
[2021-02-02 12:13:55,322] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-02-02 12:13:55,322] {taskinstance.py:882} INFO - Starting attempt 1 of 1
[2021-02-02 12:13:55,323] {taskinstance.py:883} INFO -
--------------------------------------------------------------------------------
[2021-02-02 12:13:55,390] {taskinstance.py:902} INFO - Executing <Task(BashOperator): create_dataproc_cluster> on 2021-02-02T12:13:47.063326+00:00
[2021-02-02 12:13:55,395] {standard_task_runner.py:54} INFO - Started process 3911 to run task
[2021-02-02 12:13:55,458] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'hourly_processing', 'create_dataproc_cluster', '2021-02-02T12:13:47.063326+00:00', '--job_id', '75', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/hourlyprocess.py', '--cfg_path', '/tmp/tmpfoqft1ib']
[2021-02-02 12:13:55,460] {standard_task_runner.py:78} INFO - Job 75: Subtask create_dataproc_cluster
[2021-02-02 12:13:56,127] {logging_mixin.py:112} INFO - Running <TaskInstance: hourly_processing.create_dataproc_cluster 2021-02-02T12:13:47.063326+00:00 [running]> on host airflow-worker-5b45bf88bd-mmbvk
[2021-02-02 12:13:56,224] {bash_operator.py:114} INFO - Tmp dir root location:
 /tmp@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:56,226] {bash_operator.py:137} INFO - Temporary script location: /tmp/airflowtmp2njq_lg4/create_dataproc_clusterc4xad95g@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:56,226] {bash_operator.py:147} INFO - Running command: gcloud dataproc clusters create hourly-job --bucket my-per-project-composer-stg --region europe-west1 --zone europe-west1-b --subnet projects/my-per-project/regions/europe-west1/subnetworks/rtm-test-vpc --project my-per-project --service-account 136552104597-compute@developer.gserviceaccount.com --master-machine-type n1-standard-8 --master-boot-disk-size 500 --worker-machine-type n1-standard-16 --worker-boot-disk-size 500 --num-workers 10 --image-version 1.5-debian10 --properties core:fs.gs.implicit.dir.repair.enable=false,core:fs.gs.status.parallel.enable=true --no-address@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:56,396] {bash_operator.py:154} INFO - Output:@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:58,405] {bash_operator.py:158} INFO - Waiting on operation [projects/my-per-project/regions/europe-west1/operations/6a80afd5-1a32-3bf0-8501-5b69385c9175].@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:58,412] {bash_operator.py:158} INFO - Waiting for cluster creation operation...@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:58,469] {bash_operator.py:158} INFO - WARNING: For PD-Standard without local SSDs, we strongly recommend provisioning 1TB or larger to ensure consistently high I/O performance. See https://cloud.google.com/compute/docs/disks/performance for information on disk I/O performance.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:58,471] {bash_operator.py:158} INFO - WARNING: Subnetwork 'rtm-test-vpc' does not support Private Google Access which is required for Dataproc clusters when 'internal_ip_only' is set to 'true'. Enable Private Google Access on subnetwork 'rtm-test-vpc' or set 'internal_ip_only' to 'false'.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,316] {bash_operator.py:158} INFO - ......done.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,381] {bash_operator.py:158} INFO - ERROR: (gcloud.dataproc.clusters.create) Operation [projects/my-per-project/regions/europe-west1/operations/6a80afd5-1a32-3bf0-8501-5b69385c9175] failed: Multiple Errors:@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,382] {bash_operator.py:158} INFO -  - Timeout waiting for instance hourly-job-m to report in.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,383] {bash_operator.py:158} INFO -  - Timeout waiting for instance hourly-job-w-0 to report in.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,384] {bash_operator.py:158} INFO -  - Timeout waiting for instance hourly-job-w-1 to report in.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,387] {bash_operator.py:158} INFO -  - Timeout waiting for instance hourly-job-w-9 to report in..@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,561] {bash_operator.py:162} INFO - Command exited with return code 1@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,615] {taskinstance.py:1152} ERROR - Bash command failed
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/operators/bash_operator.py", line 166, in execut
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2021-02-02 12:44:13,618] {taskinstance.py:1196} INFO - Marking task as FAILED. dag_id=hourly_processing, task_id=create_dataproc_cluster, execution_date=20210202T121347, start_date=20210202T121355, end_date=20210202T124413
[2021-02-02 12:44:13,718] {local_task_job.py:159} WARNING - State of this instance has been externally set to failed. Taking the poison pill.
[2021-02-02 12:44:13,721] {helpers.py:325} INFO - Sending Signals.SIGTERM to GPID 3911
[2021-02-02 12:44:13,898] {helpers.py:291} INFO - Process psutil.Process(pid=3911, status='terminated', exitcode=1, started='12:13:54') (3911) terminated with exit code 1
[2021-02-02 12:44:13,900] {local_task_job.py:102} INFO - Task exited with return code 1```

创建 DAG 的代码片段

def create_dag():
    with models.DAG(
        dag_id='Hourly_processing',
        schedule_interval=None,
        default_args=default_dag_args
    )as dag:
        create_dataproc_cluster_command = 'gcloud dataproc clusters create ' + cluster_name + \
                                          ' --bucket ' + bucket_name + \
                                          ' --zone ' + zone + \
                                          ' --subnet ' + sub_network_uri + \
                                          ' --region ' + region + \                                          
                                          ' --project ' + project_id + \
                                          ' --service-account ' + service_account + \
                                          ' --master-machine-type ' + master_machine_type + \
                                          ' --master-boot-disk-size 500' + \
                                          ' --worker-machine-type ' + worker_machine_type + \
                                          ' --worker-boot-disk-size 500' + \
                                          ' --num-workers ' + str(no_worker_machine) + \
                                          ' --image-version ' + image + \
                                          ' --properties ' + properties + \
                                          ' --no-address'
        create_dataproc_cluster = bash_operator.BashOperator(
            task_id='create_dataproc_cluster',
            bash_command=create_dataproc_cluster_command)

Dataproc 集群开始配置,但最终因上述错误而失败。非常感谢任何帮助。提前致谢。

发现问题。我必须在使用的 VPC 子网上启用私有 Google 访问。