如何使用气流 ssh_operator 执行 nohup 命令?
How can I do a nohup command using airflows ssh_operator?
我是 airflow 的新手,我正在尝试 运行 使用 airflow 的 ssh_operator 在 ec2 实例上进行作业,如下所示:
t2 = SSHOperator(
ssh_conn_id='ec2_ssh_connection',
task_id='execute_script',
command="nohup python test.py &",
retries=3,
dag=dag)
这项工作需要几个小时,我希望 airflow 执行 python 脚本并结束。但是,当执行命令并且 dag 完成时,脚本将在 ec2 实例上终止。我还注意到上面的代码没有创建 nohup.out 文件。
我正在研究如何使用 SSHOperator 运行 nohup。这似乎是一个 python 相关的问题,因为在执行 nohup 时我在 EC2 脚本上收到以下错误:
[Errno 32] Broken pipe
谢谢!
Airflow 的 SSHHook
使用 Paramiko 模块进行 SSH 连接。有 关于 Pramiko 和 nohup
。其中一个答案建议在 nohup
命令之后添加 sleep
。我无法确切解释原因,但它确实有效。 SSHOperator
.
中也需要设置get_pty=True
这是一个演示解决方案的完整示例:
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
'start_date': datetime(2001, 2, 3, 4, 0),
}
with DAG(
'a_dag', schedule_interval=None, default_args=default_args, catchup=False,
) as dag:
op = SSHOperator(
task_id='ssh',
ssh_conn_id='ssh_default',
command=(
'nohup python -c "import time;time.sleep(30);print(1)" & sleep 10'
),
get_pty=True, # This is needed!
)
nohup.out
文件被写入用户的$HOME
。
我是 airflow 的新手,我正在尝试 运行 使用 airflow 的 ssh_operator 在 ec2 实例上进行作业,如下所示:
t2 = SSHOperator(
ssh_conn_id='ec2_ssh_connection',
task_id='execute_script',
command="nohup python test.py &",
retries=3,
dag=dag)
这项工作需要几个小时,我希望 airflow 执行 python 脚本并结束。但是,当执行命令并且 dag 完成时,脚本将在 ec2 实例上终止。我还注意到上面的代码没有创建 nohup.out 文件。
我正在研究如何使用 SSHOperator 运行 nohup。这似乎是一个 python 相关的问题,因为在执行 nohup 时我在 EC2 脚本上收到以下错误:
[Errno 32] Broken pipe
谢谢!
Airflow 的 SSHHook
使用 Paramiko 模块进行 SSH 连接。有 nohup
。其中一个答案建议在 nohup
命令之后添加 sleep
。我无法确切解释原因,但它确实有效。 SSHOperator
.
get_pty=True
这是一个演示解决方案的完整示例:
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
'start_date': datetime(2001, 2, 3, 4, 0),
}
with DAG(
'a_dag', schedule_interval=None, default_args=default_args, catchup=False,
) as dag:
op = SSHOperator(
task_id='ssh',
ssh_conn_id='ssh_default',
command=(
'nohup python -c "import time;time.sleep(30);print(1)" & sleep 10'
),
get_pty=True, # This is needed!
)
nohup.out
文件被写入用户的$HOME
。