如何更新 Airflow 中的 AwsBatchOperator 任务代码?

How to update an AwsBatchOperator task code in Airflow?

我在 AWS 上的 EC2 机器上部署了 airflow-scheduler 和 airflow-webserver。我使用这个 airflow-scheduler 来执行带有 AwsBatchOperator 任务的 DAG。此任务执行 EC2 机器上存在的 python 脚本。这是 DAG 的代码:

from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.operators.batch import AwsBatchOperator

default_args = {
    'owner': 'admin',
    'concurrency': 3,
    'depends_on_past': True,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': None,
    'end_date': None,
    'schedule_interval': None,
}

dag = DAG(
    dag_id='my-dag',
    default_args=default_args,
    description='My DAG',
    schedule_interval='00 03 * * *',
    start_date=days_ago(1),
    tags=['dev'],
)

task = AwsBatchOperator(
  dag=dag,
  job_name= 'my-job-name',
  job_definition= 'arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name',
  job_queue= 'arn:aws:batch:eu-central-1:XXXX:job-queue/my-job-name',
  region_name= 'eu-central-1',
  task_id= 'my-task-id',
  overrides={
    'command': ['python3', './my_python_script.py']
  },
  parameters= {}
)

python脚本my_python_script.py在部署airflow的EC2机器上,目录/home/ubuntu.

我在这个 python 脚本中出现了错误。我更正了它并将更正后的脚本推送到 EC2 机器上。但是,当我执行 DAG 时,我仍然收到由我更正的拼写错误引起的错误。所以这是我的问题:

如何刷新我的 DAG 以确保它使用我的 EC2 机器上存在的脚本版本?

我试过的

要更新 AwsBatchOperator 任务的代码,您需要使用新代码而不是 EC2 机器上的代码更新 AWS 批处理作业使用的 docker 图像部署气流的地方

AwsBatchOperator 可以执行 AWS 批处理作业定义 docker 图像中存在的代码,但无法执行部署了 airflow 的 EC2 机器中存在的代码。

当您使用 AwsBatchOperator 时,您可以设置要使用的作业定义。就我而言,它是 arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name。此作业定义包含将在其中执行命令的 docker 图像。参见 https://docs.aws.amazon.com/en_en/batch/latest/userguide/Batch_GetStarted.html

我很困惑,因为它与 EC2 机器和 docker 图像上存在的代码库相同。因此,无需在部署了气流的 EC2 机器上更新代码,我只需使用新版本的代码创建一个新的 docker 图像,并让我的 AWS 批处理作业使用这个新图像。