使用相同的 DAG 文件在不同的服务器中安排 shell 脚本

Schedule the shell script in different servers using same DAG file

我是 Apache Airflow 的新手。我有一个情况。我使用的代码是

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook

default_args = {
                 'owner': 'john',
                 'depends_on_past': False,
                 'email': [''],
                 'email_on_failure': False,
                 'email_on_retry': False,
                 'retries': 0,
                 'retry_delay': timedelta(minutes=5)
               }
  
 dag = DAG(
            'tutorial',
            default_args = default_args,
            description='A simple tutorial DAG',
            schedule_interval=None)

bash_tutorial = """
  echo "Execute shell file: /A/B/server/tutorial.ksh"
  echo "{{macros.ds_format(ds, "%Y-%m-%d", "%m-%d-%Y"}}"
  source /home/johnbs/.profile
  /A/B/server/tutorial.ksh
 """

t1 = SSHOperator(
                ssh_conn_id='dev'
                task_id='tutorial.ksh'
                command=bash_tutorial,
                dag=dag
        )

使用气流,我想在开发和测试服务器等不同服务器中触发 ksh 脚本,即

tutorial.ksh 存在于路径 (/A/B/C/tutorial.ksh) 的开发服务器(conn_id 是 'dev')和测试服务器(conn_id是路径(/A/B/D/tutorial.ksh)的'test')...在这里你可以看到来自dev的C文件夹和来自test的D文件夹...我应该更新哪个区域的代码?

SSHOperator 的每个实例在单个服务器上执行一个命令。 您需要按照 docs 中的说明分别定义每个连接,然后您可以执行以下操作:

server_connection_ids = ['dev', 'test']
start_op = DummyOperator(task_id="start_task", dag=dag)
for conn in server_connection_ids:
    bash_tutorial = f"""
      echo "Execute shell file: /A/B/server/tutorial.ksh"
      echo "{{macros.ds_format(ds, "%Y-%m-%d", "%m-%d-%Y"}}"
      source /home/johnbs/.profile
      /A/B/{conn}/tutorial.ksh
     """
    ssh_op = SSHOperator(
        ssh_conn_id=f'{conn}',
        task_id=f'ssh_{conn}_task',
        command=bash_tutorial,
        dag=dag
    )

这将为每个服务器创建一个任务。