Airflow dag bash 操作员权限被拒绝
Airflow dag bash operator permission denied
我对气流还很陌生,我尝试每 5 分钟 运行 一个 ETL 过程。我有一个气流 dag,我试图将其安排为每 5 分钟 运行,但 dag 失败并显示错误消息 ERROR-bash 命令失败,权限被拒绝。
dag 基本上是一个 ETL 过程,具有一个 BashOperator(失败)和三个 PythonOperator,它们是 BashOperator 的下游过程。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from bin.int_medications import int_meds_auto_updt, storage, insert, del_stag, int_med_stag_clean
dag3 = DAG(dag_id = 'int_meds_dag_v1',
start_date=datetime(2019, 10, 10),
default_args = DAG_DEFAULT_ARGS,
schedule_interval = '*/5 * * * *',
catchup = False)
cmd_command = "/home/akash/airflow/dags/bin/int_medications/"
data_loading = BashOperator(
task_id = "int_meds",
bash_command = cmd_command,
data_cleaning = PythonOperator(task_id = 'data_cleaning', python_callable = int_med_stag_clean.clean_stag)
data_insert = PythonOperator(task_id = 'data_insert', python_callable = insert.insert_stag)
data_delete = PythonOperator(task_id = 'data_delete', python_callable = del_stag.delete_stag)
data_loading >> data_cleaning >> data_insert >> data_delete
*** Reading local file: /home/akash/airflow/logs/int_meds_dag_v1/int_meds/2019-10-10T14:45:00+00:00/1.log
[2019-10-10 10:50:26,649] {} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {} INFO -
[2019-10-10 10:50:26,652] {} INFO - Starting attempt 1 of 1
[2019-10-10 10:50:26,652] {} INFO -
[2019-10-10 10:50:26,659] {} INFO - Executing <Task(BashOperator): int_meds> on 2019-10-10T14:45:00+00:00
[2019-10-10 10:50:26,659] {} INFO - Running: ['airflow', 'run', 'int_meds_dag_v1', 'int_meds', '2019-10-10T14:45:00+00:00', '--job_id', '15495', '--raw', '-sd', 'DAGS_FOLDER/', '--cfg_path', '/tmp/tmpenegd6zi']
[2019-10-10 10:50:28,319] {} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,318] {} INFO - Using executor SequentialExecutor
[2019-10-10 10:50:28,436] {} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,436] {} INFO - Filling up the DagBag from /home/akash/airflow/dags/
[2019-10-10 10:50:29,739] {} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:29,739] {} INFO - Running <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [running]> on host TRLPowerSpec.local
[2019-10-10 10:50:29,751] {} INFO - Tmp dir root location:
[2019-10-10 10:50:29,751] {} INFO - Exporting the following env vars:
[2019-10-10 10:50:29,751] {} INFO - Temporary script location: /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v
[2019-10-10 10:50:29,751] {} INFO - Running command: /home/akash/airflow/dags/bin/int_medications/
[2019-10-10 10:50:29,756] {} INFO - Output:
[2019-10-10 10:50:29,757] {} INFO - /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v: line 1: /home/akash/airflow/dags/bin/int_medications/ Permission denied
[2019-10-10 10:50:29,757] {} INFO - Command exited with return code 126
[2019-10-10 10:50:29,760] {} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/", line 1441, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/", line 135, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:29,761] {} INFO - Marking task as FAILED.
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds Traceback (most recent call last):
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/bin/airflow", line 32, in <module>
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds args.func(args)
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/", line 74, in wrapper
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds return f(*args, **kwargs)
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/", line 523, in run
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds _run(args, dag, ti)
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/", line 442, in _run
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds pool=args.pool,
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/", line 73, in wrapper
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds return func(*args, **kwargs)
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/", line 1441, in _run_raw_task
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds result = task_copy.execute(context=context)
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/", line 135, in execute
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds raise AirflowException("Bash command failed")
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:31,649] {} INFO - [2019-10-10 10:50:31,649] {} INFO - Task exited with return code 1
授予 python 文件权限
sudo chmod -R -f 777 /path/to/file
Bash 运算符需要 bash_command
参数中的 bash 文件(在这种情况下,文件扩展名应为 .sh
)或 Bash 命令。尝试将 cmd_command
cmd_command = "python /home/akash/airflow/dags/bin/int_medications/"
或者,您可以改用 PythonOperator 和来自
的 运行 代码
我对气流还很陌生,我尝试每 5 分钟 运行 一个 ETL 过程。我有一个气流 dag,我试图将其安排为每 5 分钟 运行,但 dag 失败并显示错误消息 ERROR-bash 命令失败,权限被拒绝。
dag 基本上是一个 ETL 过程,具有一个 BashOperator(失败)和三个 PythonOperator,它们是 BashOperator 的下游过程。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from bin.int_medications import int_meds_auto_updt, storage, insert, del_stag, int_med_stag_clean
dag3 = DAG(dag_id = 'int_meds_dag_v1',
start_date=datetime(2019, 10, 10),
default_args = DAG_DEFAULT_ARGS,
schedule_interval = '*/5 * * * *',
catchup = False)
cmd_command = "/home/akash/airflow/dags/bin/int_medications/"
data_loading = BashOperator(
task_id = "int_meds",
bash_command = cmd_command,
data_cleaning = PythonOperator(task_id = 'data_cleaning', python_callable = int_med_stag_clean.clean_stag)
data_insert = PythonOperator(task_id = 'data_insert', python_callable = insert.insert_stag)
data_delete = PythonOperator(task_id = 'data_delete', python_callable = del_stag.delete_stag)
data_loading >> data_cleaning >> data_insert >> data_delete
*** Reading local file: /home/akash/airflow/logs/int_meds_dag_v1/int_meds/2019-10-10T14:45:00+00:00/1.log
[2019-10-10 10:50:26,649] {} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {} INFO -
[2019-10-10 10:50:26,652] {} INFO - Starting attempt 1 of 1
[2019-10-10 10:50:26,652] {} INFO -
[2019-10-10 10:50:26,659] {} INFO - Executing <Task(BashOperator): int_meds> on 2019-10-10T14:45:00+00:00
[2019-10-10 10:50:26,659] {} INFO - Running: ['airflow', 'run', 'int_meds_dag_v1', 'int_meds', '2019-10-10T14:45:00+00:00', '--job_id', '15495', '--raw', '-sd', 'DAGS_FOLDER/', '--cfg_path', '/tmp/tmpenegd6zi']
[2019-10-10 10:50:28,319] {} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,318] {} INFO - Using executor SequentialExecutor
[2019-10-10 10:50:28,436] {} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,436] {} INFO - Filling up the DagBag from /home/akash/airflow/dags/
[2019-10-10 10:50:29,739] {} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:29,739] {} INFO - Running <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [running]> on host TRLPowerSpec.local
[2019-10-10 10:50:29,751] {} INFO - Tmp dir root location:
[2019-10-10 10:50:29,751] {} INFO - Exporting the following env vars:
[2019-10-10 10:50:29,751] {} INFO - Temporary script location: /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v
[2019-10-10 10:50:29,751] {} INFO - Running command: /home/akash/airflow/dags/bin/int_medications/
[2019-10-10 10:50:29,756] {} INFO - Output:
[2019-10-10 10:50:29,757] {} INFO - /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v: line 1: /home/akash/airflow/dags/bin/int_medications/ Permission denied
[2019-10-10 10:50:29,757] {} INFO - Command exited with return code 126
[2019-10-10 10:50:29,760] {} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/", line 1441, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/", line 135, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:29,761] {} INFO - Marking task as FAILED.
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds Traceback (most recent call last):
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/bin/airflow", line 32, in <module>
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds args.func(args)
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/", line 74, in wrapper
[2019-10-10 10:50:29,768] {} INFO - Job 15495: Subtask int_meds return f(*args, **kwargs)
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/", line 523, in run
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds _run(args, dag, ti)
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/", line 442, in _run
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds pool=args.pool,
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/", line 73, in wrapper
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds return func(*args, **kwargs)
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/", line 1441, in _run_raw_task
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds result = task_copy.execute(context=context)
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/", line 135, in execute
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds raise AirflowException("Bash command failed")
[2019-10-10 10:50:29,769] {} INFO - Job 15495: Subtask int_meds airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:31,649] {} INFO - [2019-10-10 10:50:31,649] {} INFO - Task exited with return code 1
授予 python 文件权限sudo chmod -R -f 777 /path/to/file
Bash 运算符需要 bash_command
参数中的 bash 文件(在这种情况下,文件扩展名应为 .sh
)或 Bash 命令。尝试将 cmd_command
cmd_command = "python /home/akash/airflow/dags/bin/int_medications/"
或者,您可以改用 PythonOperator 和来自