调用 BashOperator 时出错:Bash 命令失败
Error calling BashOperator: Bash command failed
这是我的 dag 文件和 Bash操作员任务:
my_dag = {
dag_id = 'my_dag',
start_date = datetime(year=2017, month=3, day=28),
schedule_interval='01***',
}
my_bash_task = BashOperator(
task_id="my_bash_task",
bash_command=bash_command,
dag=my_dag)
bash_command = "/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh ""
在这个 之后,我什至在 bash 文件之后给出了 space 以避免 TemplateNotFound 错误。但是虽然 运行 这个任务给了我这个错误:airflow.exceptions.AirflowException: Bash 命令失败 。
bash_command 文件内容为:
#!/bin/bash
DATABASE=db_name
FILE=$DATABASE-`date +%F-%H%M%S`.backup
export PGPASSWORD=password
pg_dump -h localhost -p 5432 -U developer -F c -b -v -f ~/Dropbox/database_backup/location/$FILE db_name
unset PGPASSWORD
然而,不是将 bash_command 指向 bash 文件,而是在多行字符串中写入命令:
bash_command = """
DATABASE=db_name
FILE=$DATABASE-`date +%F-%H%M%S`.backup
export PGPASSWORD=password
pg_dump -h localhost -p 5432 -U developer -F c -b -v -f ~/Dropbox/database_backup/location/$FILE db_name
unset PGPASSWORD
"""
因此我假设错误不是因为 bash 命令。
我什至尝试将 bash 文件中的 #!/bin/bash 替换为 #!/bin/sh,但也没有用。
我 运行 sh db_back_up_bash.sh
来自终结者,它工作正常。
更新 实际代码:
bash_file_location_to_backup_db = '{{"/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh"}}'
# bash_file_location_to_backup_db = "/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh "
bash_command = """
DATABASE=ksaprice_scraping
FILE=$DATABASE-`date +%F-%H%M%S`.backup
export PGPASSWORD=password
pg_dump -h localhost -p 5432 -U developer -F c -b -v -f ~/Dropbox/database_backup/ksaprice/$FILE ksaprice_scraping
unset PGPASSWORD
"""
backup_scraped_db_in_dropbox_task = BashOperator(
task_id="backup_scraped_db_in_dropbox_task",
# bash_command=bash_command,# this works fine
bash_command=bash_file_location_to_backup_db,#this give error :airflow.exceptions.AirflowException: Bash command failed
dag=dag_crawl
)
错误跟踪:
[2017-04-11 20:02:14,905] {bash_operator.py:90} INFO - Output:
2017-04-11 20:02:14,905 | INFO| root : Output:
[2017-04-11 20:02:14,906] {bash_operator.py:94} INFO - /tmp/airflowtmp7FffJ2/backup_scraped_db_in_dropbox_taskQ6IVxm: line 1: /home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh: Permission denied
2017-04-11 20:02:14,906 | INFO| root : /tmp/airflowtmp7FffJ2/backup_scraped_db_in_dropbox_taskQ6IVxm: line 1: /home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh: Permission denied
[2017-04-11 20:02:14,906] {bash_operator.py:97} INFO - Command exited with return code 126
2017-04-11 20:02:14,906 | INFO| root : Command exited with return code 126
[2017-04-11 20:02:14,906] {models.py:1417} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
raise AirflowException("Bash command failed")
AirflowException: Bash command failed
2017-04-11 20:02:14,906 | ERROR| root : Bash command failed
Traceback (most recent call last):
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2017-04-11 20:02:14,907] {models.py:1441} INFO - Marking task as FAILED.
2017-04-11 20:02:14,907 | INFO| root : Marking task as FAILED.
[2017-04-11 20:02:14,947] {models.py:1462} ERROR - Bash command failed
2017-04-11 20:02:14,947 | ERROR| root : Bash command failed
Traceback (most recent call last):
File "/home/jak/my_projects/workflow_env/bin/airflow", line 28, in <module>
args.func(args)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 585, in test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command faile
我认为这是 airflow 中的一个错误,jinja 不应期望 .sh 文件包含 BashOperator 中的模板信息。
我通过将命令放入 Jinja 将正确解释的格式来绕过它:
bash_command = '{{"/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh"}}'
我更改了 bash 文件的权限,它起作用了。检查权限,因为 airflow
需要访问和执行文件。
bash_command="/root/airflow/test.sh "
希望这对您有所帮助...
这是我的 dag 文件和 Bash操作员任务:
my_dag = {
dag_id = 'my_dag',
start_date = datetime(year=2017, month=3, day=28),
schedule_interval='01***',
}
my_bash_task = BashOperator(
task_id="my_bash_task",
bash_command=bash_command,
dag=my_dag)
bash_command = "/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh ""
在这个
bash_command 文件内容为:
#!/bin/bash
DATABASE=db_name
FILE=$DATABASE-`date +%F-%H%M%S`.backup
export PGPASSWORD=password
pg_dump -h localhost -p 5432 -U developer -F c -b -v -f ~/Dropbox/database_backup/location/$FILE db_name
unset PGPASSWORD
然而,不是将 bash_command 指向 bash 文件,而是在多行字符串中写入命令:
bash_command = """
DATABASE=db_name
FILE=$DATABASE-`date +%F-%H%M%S`.backup
export PGPASSWORD=password
pg_dump -h localhost -p 5432 -U developer -F c -b -v -f ~/Dropbox/database_backup/location/$FILE db_name
unset PGPASSWORD
"""
因此我假设错误不是因为 bash 命令。 我什至尝试将 bash 文件中的 #!/bin/bash 替换为 #!/bin/sh,但也没有用。
我 运行 sh db_back_up_bash.sh
来自终结者,它工作正常。
更新 实际代码:
bash_file_location_to_backup_db = '{{"/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh"}}'
# bash_file_location_to_backup_db = "/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh "
bash_command = """
DATABASE=ksaprice_scraping
FILE=$DATABASE-`date +%F-%H%M%S`.backup
export PGPASSWORD=password
pg_dump -h localhost -p 5432 -U developer -F c -b -v -f ~/Dropbox/database_backup/ksaprice/$FILE ksaprice_scraping
unset PGPASSWORD
"""
backup_scraped_db_in_dropbox_task = BashOperator(
task_id="backup_scraped_db_in_dropbox_task",
# bash_command=bash_command,# this works fine
bash_command=bash_file_location_to_backup_db,#this give error :airflow.exceptions.AirflowException: Bash command failed
dag=dag_crawl
)
错误跟踪:
[2017-04-11 20:02:14,905] {bash_operator.py:90} INFO - Output:
2017-04-11 20:02:14,905 | INFO| root : Output:
[2017-04-11 20:02:14,906] {bash_operator.py:94} INFO - /tmp/airflowtmp7FffJ2/backup_scraped_db_in_dropbox_taskQ6IVxm: line 1: /home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh: Permission denied
2017-04-11 20:02:14,906 | INFO| root : /tmp/airflowtmp7FffJ2/backup_scraped_db_in_dropbox_taskQ6IVxm: line 1: /home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh: Permission denied
[2017-04-11 20:02:14,906] {bash_operator.py:97} INFO - Command exited with return code 126
2017-04-11 20:02:14,906 | INFO| root : Command exited with return code 126
[2017-04-11 20:02:14,906] {models.py:1417} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
raise AirflowException("Bash command failed")
AirflowException: Bash command failed
2017-04-11 20:02:14,906 | ERROR| root : Bash command failed
Traceback (most recent call last):
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2017-04-11 20:02:14,907] {models.py:1441} INFO - Marking task as FAILED.
2017-04-11 20:02:14,907 | INFO| root : Marking task as FAILED.
[2017-04-11 20:02:14,947] {models.py:1462} ERROR - Bash command failed
2017-04-11 20:02:14,947 | ERROR| root : Bash command failed
Traceback (most recent call last):
File "/home/jak/my_projects/workflow_env/bin/airflow", line 28, in <module>
args.func(args)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 585, in test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command faile
我认为这是 airflow 中的一个错误,jinja 不应期望 .sh 文件包含 BashOperator 中的模板信息。
我通过将命令放入 Jinja 将正确解释的格式来绕过它:
bash_command = '{{"/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh"}}'
我更改了 bash 文件的权限,它起作用了。检查权限,因为 airflow
需要访问和执行文件。
bash_command="/root/airflow/test.sh "
希望这对您有所帮助...