使用相同的 DAG 文件在不同的服务器中安排 shell 脚本
Schedule the shell script in different servers using same DAG file
我是 Apache Airflow 的新手。我有一个情况。我使用的代码是
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
default_args = {
'owner': 'john',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'tutorial',
default_args = default_args,
description='A simple tutorial DAG',
schedule_interval=None)
bash_tutorial = """
echo "Execute shell file: /A/B/server/tutorial.ksh"
echo "{{macros.ds_format(ds, "%Y-%m-%d", "%m-%d-%Y"}}"
source /home/johnbs/.profile
/A/B/server/tutorial.ksh
"""
t1 = SSHOperator(
ssh_conn_id='dev'
task_id='tutorial.ksh'
command=bash_tutorial,
dag=dag
)
使用气流,我想在开发和测试服务器等不同服务器中触发 ksh 脚本,即
tutorial.ksh 存在于路径 (/A/B/C/tutorial.ksh) 的开发服务器(conn_id 是 'dev')和测试服务器(conn_id是路径(/A/B/D/tutorial.ksh)的'test')...在这里你可以看到来自dev的C文件夹和来自test的D文件夹...我应该更新哪个区域的代码?
SSHOperator
的每个实例在单个服务器上执行一个命令。
您需要按照 docs 中的说明分别定义每个连接,然后您可以执行以下操作:
server_connection_ids = ['dev', 'test']
start_op = DummyOperator(task_id="start_task", dag=dag)
for conn in server_connection_ids:
bash_tutorial = f"""
echo "Execute shell file: /A/B/server/tutorial.ksh"
echo "{{macros.ds_format(ds, "%Y-%m-%d", "%m-%d-%Y"}}"
source /home/johnbs/.profile
/A/B/{conn}/tutorial.ksh
"""
ssh_op = SSHOperator(
ssh_conn_id=f'{conn}',
task_id=f'ssh_{conn}_task',
command=bash_tutorial,
dag=dag
)
这将为每个服务器创建一个任务。
我是 Apache Airflow 的新手。我有一个情况。我使用的代码是
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
default_args = {
'owner': 'john',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'tutorial',
default_args = default_args,
description='A simple tutorial DAG',
schedule_interval=None)
bash_tutorial = """
echo "Execute shell file: /A/B/server/tutorial.ksh"
echo "{{macros.ds_format(ds, "%Y-%m-%d", "%m-%d-%Y"}}"
source /home/johnbs/.profile
/A/B/server/tutorial.ksh
"""
t1 = SSHOperator(
ssh_conn_id='dev'
task_id='tutorial.ksh'
command=bash_tutorial,
dag=dag
)
使用气流,我想在开发和测试服务器等不同服务器中触发 ksh 脚本,即
tutorial.ksh 存在于路径 (/A/B/C/tutorial.ksh) 的开发服务器(conn_id 是 'dev')和测试服务器(conn_id是路径(/A/B/D/tutorial.ksh)的'test')...在这里你可以看到来自dev的C文件夹和来自test的D文件夹...我应该更新哪个区域的代码?
SSHOperator
的每个实例在单个服务器上执行一个命令。
您需要按照 docs 中的说明分别定义每个连接,然后您可以执行以下操作:
server_connection_ids = ['dev', 'test']
start_op = DummyOperator(task_id="start_task", dag=dag)
for conn in server_connection_ids:
bash_tutorial = f"""
echo "Execute shell file: /A/B/server/tutorial.ksh"
echo "{{macros.ds_format(ds, "%Y-%m-%d", "%m-%d-%Y"}}"
source /home/johnbs/.profile
/A/B/{conn}/tutorial.ksh
"""
ssh_op = SSHOperator(
ssh_conn_id=f'{conn}',
task_id=f'ssh_{conn}_task',
command=bash_tutorial,
dag=dag
)
这将为每个服务器创建一个任务。