如何使用气流 ssh_operator 执行 nohup 命令?

How can I do a nohup command using airflows ssh_operator?

我是 airflow 的新手,我正在尝试 运行 使用 airflow 的 ssh_operator 在 ec2 实例上进行作业,如下所示:

t2 = SSHOperator(
    ssh_conn_id='ec2_ssh_connection',
    task_id='execute_script',
    command="nohup python test.py &",
    retries=3,
    dag=dag)

这项工作需要几个小时,我希望 airflow 执行 python 脚本并结束。但是,当执行命令并且 dag 完成时,脚本将在 ec2 实例上终止。我还注意到上面的代码没有创建 nohup.out 文件。

我正在研究如何使用 SSHOperator 运行 nohup。这似乎是一个 python 相关的问题,因为在执行 nohup 时我在 EC2 脚本上收到以下错误:

[Errno 32] Broken pipe

谢谢!

Airflow 的 SSHHook 使用 Paramiko 模块进行 SSH 连接。有 关于 Pramiko 和 nohup。其中一个答案建议在 nohup 命令之后添加 sleep 。我无法确切解释原因,但它确实有效。 SSHOperator.

中也需要设置get_pty=True

这是一个演示解决方案的完整示例:

from datetime import datetime
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator


default_args = {
    'start_date': datetime(2001, 2, 3, 4, 0),
}

with DAG(
    'a_dag', schedule_interval=None, default_args=default_args, catchup=False,
) as dag:
    op = SSHOperator(
        task_id='ssh',
        ssh_conn_id='ssh_default',
        command=(
            'nohup python -c "import time;time.sleep(30);print(1)" & sleep 10'
        ),
        get_pty=True,  # This is needed!
    )

nohup.out 文件被写入用户的$HOME