Airflow dag bash 操作员权限被拒绝
Airflow dag bash operator permission denied
我对气流还很陌生,我尝试每 5 分钟 运行 一个 ETL 过程。我有一个气流 dag,我试图将其安排为每 5 分钟 运行,但 dag 失败并显示错误消息 ERROR-bash 命令失败,权限被拒绝。
dag 基本上是一个 ETL 过程,具有一个 BashOperator(失败)和三个 PythonOperator,它们是 BashOperator 的下游过程。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from bin.int_medications import int_meds_auto_updt, storage, insert, del_stag, int_med_stag_clean
DAG_DEFAULT_ARGS = {
'owner':'airflow',
'depends_on_past':False,
'retires':1,
}
dag3 = DAG(dag_id = 'int_meds_dag_v1',
start_date=datetime(2019, 10, 10),
default_args = DAG_DEFAULT_ARGS,
schedule_interval = '*/5 * * * *',
catchup = False)
cmd_command = "/home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"
data_loading = BashOperator(
task_id = "int_meds",
bash_command = cmd_command,
dag=dag3)
data_cleaning = PythonOperator(task_id = 'data_cleaning', python_callable = int_med_stag_clean.clean_stag)
data_insert = PythonOperator(task_id = 'data_insert', python_callable = insert.insert_stag)
data_delete = PythonOperator(task_id = 'data_delete', python_callable = del_stag.delete_stag)
data_loading >> data_cleaning >> data_insert >> data_delete
附件是dag文件的代码,下面是错误信息。
*** Reading local file: /home/akash/airflow/logs/int_meds_dag_v1/int_meds/2019-10-10T14:45:00+00:00/1.log
[2019-10-10 10:50:26,649] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1353} INFO -
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,652] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-10 10:50:26,652] {__init__.py:1355} INFO -
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,659] {__init__.py:1374} INFO - Executing <Task(BashOperator): int_meds> on 2019-10-10T14:45:00+00:00
[2019-10-10 10:50:26,659] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'int_meds_dag_v1', 'int_meds', '2019-10-10T14:45:00+00:00', '--job_id', '15495', '--raw', '-sd', 'DAGS_FOLDER/int_med_dag.py', '--cfg_path', '/tmp/tmpenegd6zi']
[2019-10-10 10:50:28,319] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,318] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-10 10:50:28,436] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,436] {__init__.py:305} INFO - Filling up the DagBag from /home/akash/airflow/dags/int_med_dag.py
[2019-10-10 10:50:29,739] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:29,739] {cli.py:517} INFO - Running <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [running]> on host TRLPowerSpec.local
[2019-10-10 10:50:29,751] {bash_operator.py:81} INFO - Tmp dir root location:
/tmp
[2019-10-10 10:50:29,751] {bash_operator.py:90} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=int_meds_dag_v1
AIRFLOW_CTX_TASK_ID=int_meds
AIRFLOW_CTX_EXECUTION_DATE=2019-10-10T14:45:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-10-10T14:45:00+00:00
[2019-10-10 10:50:29,751] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v
[2019-10-10 10:50:29,751] {bash_operator.py:114} INFO - Running command: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py
[2019-10-10 10:50:29,756] {bash_operator.py:123} INFO - Output:
[2019-10-10 10:50:29,757] {bash_operator.py:127} INFO - /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v: line 1: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py: Permission denied
[2019-10-10 10:50:29,757] {bash_operator.py:131} INFO - Command exited with return code 126
[2019-10-10 10:50:29,760] {__init__.py:1580} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:29,761] {__init__.py:1611} INFO - Marking task as FAILED.
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds Traceback (most recent call last):
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/bin/airflow", line 32, in <module>
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds args.func(args)
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds return f(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds _run(args, dag, ti)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds pool=args.pool,
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds return func(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds result = task_copy.execute(context=context)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds raise AirflowException("Bash command failed")
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:31,649] {logging_mixin.py:95} INFO - [2019-10-10 10:50:31,649] {jobs.py:2562} INFO - Task exited with return code 1
我还尝试使用
授予 python 文件权限
sudo chmod -R -f 777 /path/to/file
但是,它仍然会在气流中引发相同的错误。
如果我能知道错误是什么并能改正,我将不胜感激。
Bash 运算符需要 bash_command
参数中的 bash 文件(在这种情况下,文件扩展名应为 .sh
)或 Bash 命令。尝试将 cmd_command
替换为:
cmd_command = "python /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"
或者,您可以改用 PythonOperator 和来自 int_meds_auto_updt.py
的 运行 代码
我对气流还很陌生,我尝试每 5 分钟 运行 一个 ETL 过程。我有一个气流 dag,我试图将其安排为每 5 分钟 运行,但 dag 失败并显示错误消息 ERROR-bash 命令失败,权限被拒绝。
dag 基本上是一个 ETL 过程,具有一个 BashOperator(失败)和三个 PythonOperator,它们是 BashOperator 的下游过程。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from bin.int_medications import int_meds_auto_updt, storage, insert, del_stag, int_med_stag_clean
DAG_DEFAULT_ARGS = {
'owner':'airflow',
'depends_on_past':False,
'retires':1,
}
dag3 = DAG(dag_id = 'int_meds_dag_v1',
start_date=datetime(2019, 10, 10),
default_args = DAG_DEFAULT_ARGS,
schedule_interval = '*/5 * * * *',
catchup = False)
cmd_command = "/home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"
data_loading = BashOperator(
task_id = "int_meds",
bash_command = cmd_command,
dag=dag3)
data_cleaning = PythonOperator(task_id = 'data_cleaning', python_callable = int_med_stag_clean.clean_stag)
data_insert = PythonOperator(task_id = 'data_insert', python_callable = insert.insert_stag)
data_delete = PythonOperator(task_id = 'data_delete', python_callable = del_stag.delete_stag)
data_loading >> data_cleaning >> data_insert >> data_delete
附件是dag文件的代码,下面是错误信息。
*** Reading local file: /home/akash/airflow/logs/int_meds_dag_v1/int_meds/2019-10-10T14:45:00+00:00/1.log
[2019-10-10 10:50:26,649] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1353} INFO -
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,652] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-10 10:50:26,652] {__init__.py:1355} INFO -
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,659] {__init__.py:1374} INFO - Executing <Task(BashOperator): int_meds> on 2019-10-10T14:45:00+00:00
[2019-10-10 10:50:26,659] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'int_meds_dag_v1', 'int_meds', '2019-10-10T14:45:00+00:00', '--job_id', '15495', '--raw', '-sd', 'DAGS_FOLDER/int_med_dag.py', '--cfg_path', '/tmp/tmpenegd6zi']
[2019-10-10 10:50:28,319] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,318] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-10 10:50:28,436] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,436] {__init__.py:305} INFO - Filling up the DagBag from /home/akash/airflow/dags/int_med_dag.py
[2019-10-10 10:50:29,739] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:29,739] {cli.py:517} INFO - Running <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [running]> on host TRLPowerSpec.local
[2019-10-10 10:50:29,751] {bash_operator.py:81} INFO - Tmp dir root location:
/tmp
[2019-10-10 10:50:29,751] {bash_operator.py:90} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=int_meds_dag_v1
AIRFLOW_CTX_TASK_ID=int_meds
AIRFLOW_CTX_EXECUTION_DATE=2019-10-10T14:45:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-10-10T14:45:00+00:00
[2019-10-10 10:50:29,751] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v
[2019-10-10 10:50:29,751] {bash_operator.py:114} INFO - Running command: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py
[2019-10-10 10:50:29,756] {bash_operator.py:123} INFO - Output:
[2019-10-10 10:50:29,757] {bash_operator.py:127} INFO - /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v: line 1: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py: Permission denied
[2019-10-10 10:50:29,757] {bash_operator.py:131} INFO - Command exited with return code 126
[2019-10-10 10:50:29,760] {__init__.py:1580} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:29,761] {__init__.py:1611} INFO - Marking task as FAILED.
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds Traceback (most recent call last):
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/bin/airflow", line 32, in <module>
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds args.func(args)
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds return f(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds _run(args, dag, ti)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds pool=args.pool,
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds return func(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds result = task_copy.execute(context=context)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds raise AirflowException("Bash command failed")
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:31,649] {logging_mixin.py:95} INFO - [2019-10-10 10:50:31,649] {jobs.py:2562} INFO - Task exited with return code 1
我还尝试使用
授予 python 文件权限sudo chmod -R -f 777 /path/to/file
但是,它仍然会在气流中引发相同的错误。
如果我能知道错误是什么并能改正,我将不胜感激。
Bash 运算符需要 bash_command
参数中的 bash 文件(在这种情况下,文件扩展名应为 .sh
)或 Bash 命令。尝试将 cmd_command
替换为:
cmd_command = "python /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"
或者,您可以改用 PythonOperator 和来自 int_meds_auto_updt.py