Airflow:如何从不同的服务器通过 SSH 和 运行 BashOperator
Airflow: How to SSH and run BashOperator from a different server
有没有办法使用 Airbnb 的 Airflow 连接到不同的服务器和 运行 BashOperator?
我正在尝试 运行 使用 Airflow 的配置单元 sql 命令,但我需要通过 SSH 连接到另一个盒子才能 运行 配置单元 shell。
我的任务应该是这样的:
- SSH 到服务器 1
- 启动 Hive shell
- 运行 配置单元命令
谢谢!
不适用于气流 2.x。
我想我刚刚弄明白了:
在“管理”>“连接”下的 UI 中创建 SSH 连接。注意:如果重置数据库,连接将被删除
在 Python 文件中添加以下内容
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
添加 SSH 操作员任务
t1 = SSHExecuteOperator(
task_id="task1",
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
谢谢!
Anton 的回答需要注意的一点是,对于 SSHOperator
对象,参数实际上是 ssh_conn_id
,而不是 conn_id
。至少在 1.10 版本中。
一个简单的例子看起来像
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(dag_id='testing_stuff',
default_args=default_args,
schedule_interval='0,10,20,30,40,50 * * * *',
dagrun_timeout=timedelta(seconds=120))
# Step 1 - Dump data from postgres databases
t1_bash = """
echo 'Hello World'
"""
t1 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='test_ssh_operator',
command=t1_bash,
dag=dag)
这是一个在 Airflow 2 中使用 ssh 操作符的工作示例:
[注意:此运算符的输出是 base64 编码的]
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
sshHook = SSHHook(ssh_conn_id="conn-id", key_file='/opt/airflow/keys/ssh.key')
# a hook can also be defined directly in the code:
# sshHook = SSHHook(remote_host='server.com', username='admin', key_file='/opt/airflow/keys/ssh.key')
ls = SSHOperator(
task_id="ls",
command= "ls -l",
ssh_hook = sshHook,
dag = dag)
conn-id
是在Admin -> Connections里设置的。
key_file
是 ssh 私钥。
有没有办法使用 Airbnb 的 Airflow 连接到不同的服务器和 运行 BashOperator? 我正在尝试 运行 使用 Airflow 的配置单元 sql 命令,但我需要通过 SSH 连接到另一个盒子才能 运行 配置单元 shell。 我的任务应该是这样的:
- SSH 到服务器 1
- 启动 Hive shell
- 运行 配置单元命令
谢谢!
不适用于气流 2.x。
我想我刚刚弄明白了:
在“管理”>“连接”下的 UI 中创建 SSH 连接。注意:如果重置数据库,连接将被删除
在 Python 文件中添加以下内容
from airflow.contrib.hooks import SSHHook sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
添加 SSH 操作员任务
t1 = SSHExecuteOperator( task_id="task1", bash_command=<YOUR COMMAND>, ssh_hook=sshHook, dag=dag)
谢谢!
Anton 的回答需要注意的一点是,对于 SSHOperator
对象,参数实际上是 ssh_conn_id
,而不是 conn_id
。至少在 1.10 版本中。
一个简单的例子看起来像
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(dag_id='testing_stuff',
default_args=default_args,
schedule_interval='0,10,20,30,40,50 * * * *',
dagrun_timeout=timedelta(seconds=120))
# Step 1 - Dump data from postgres databases
t1_bash = """
echo 'Hello World'
"""
t1 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='test_ssh_operator',
command=t1_bash,
dag=dag)
这是一个在 Airflow 2 中使用 ssh 操作符的工作示例:
[注意:此运算符的输出是 base64 编码的]
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
sshHook = SSHHook(ssh_conn_id="conn-id", key_file='/opt/airflow/keys/ssh.key')
# a hook can also be defined directly in the code:
# sshHook = SSHHook(remote_host='server.com', username='admin', key_file='/opt/airflow/keys/ssh.key')
ls = SSHOperator(
task_id="ls",
command= "ls -l",
ssh_hook = sshHook,
dag = dag)
conn-id
是在Admin -> Connections里设置的。
key_file
是 ssh 私钥。