如何更新 Airflow 中的 AwsBatchOperator 任务代码?
How to update an AwsBatchOperator task code in Airflow?
我在 AWS 上的 EC2 机器上部署了 airflow-scheduler 和 airflow-webserver。我使用这个 airflow-scheduler 来执行带有 AwsBatchOperator
任务的 DAG。此任务执行 EC2 机器上存在的 python 脚本。这是 DAG 的代码:
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.operators.batch import AwsBatchOperator
default_args = {
'owner': 'admin',
'concurrency': 3,
'depends_on_past': True,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': None,
'end_date': None,
'schedule_interval': None,
}
dag = DAG(
dag_id='my-dag',
default_args=default_args,
description='My DAG',
schedule_interval='00 03 * * *',
start_date=days_ago(1),
tags=['dev'],
)
task = AwsBatchOperator(
dag=dag,
job_name= 'my-job-name',
job_definition= 'arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name',
job_queue= 'arn:aws:batch:eu-central-1:XXXX:job-queue/my-job-name',
region_name= 'eu-central-1',
task_id= 'my-task-id',
overrides={
'command': ['python3', './my_python_script.py']
},
parameters= {}
)
python脚本my_python_script.py
在部署airflow的EC2机器上,目录/home/ubuntu
.
我在这个 python 脚本中出现了错误。我更正了它并将更正后的脚本推送到 EC2 机器上。但是,当我执行 DAG 时,我仍然收到由我更正的拼写错误引起的错误。所以这是我的问题:
如何刷新我的 DAG 以确保它使用我的 EC2 机器上存在的脚本版本?
我试过的
- 点击 Airflow 网页界面上的“刷新”按钮刷新 DAG
- 等待 airflow-scheduler 自动刷新 DAG
- 删除DAG等待刷新
- 使用命令
python -m compileall
在 EC2 机器上重新编译 python 脚本
要更新 AwsBatchOperator
任务的代码,您需要使用新代码而不是 EC2 机器上的代码更新 AWS 批处理作业使用的 docker 图像部署气流的地方
AwsBatchOperator
可以执行 AWS 批处理作业定义 docker 图像中存在的代码,但无法执行部署了 airflow 的 EC2 机器中存在的代码。
当您使用 AwsBatchOperator
时,您可以设置要使用的作业定义。就我而言,它是 arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name
。此作业定义包含将在其中执行命令的 docker 图像。参见 https://docs.aws.amazon.com/en_en/batch/latest/userguide/Batch_GetStarted.html
我很困惑,因为它与 EC2 机器和 docker 图像上存在的代码库相同。因此,无需在部署了气流的 EC2 机器上更新代码,我只需使用新版本的代码创建一个新的 docker 图像,并让我的 AWS 批处理作业使用这个新图像。
我在 AWS 上的 EC2 机器上部署了 airflow-scheduler 和 airflow-webserver。我使用这个 airflow-scheduler 来执行带有 AwsBatchOperator
任务的 DAG。此任务执行 EC2 机器上存在的 python 脚本。这是 DAG 的代码:
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.operators.batch import AwsBatchOperator
default_args = {
'owner': 'admin',
'concurrency': 3,
'depends_on_past': True,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': None,
'end_date': None,
'schedule_interval': None,
}
dag = DAG(
dag_id='my-dag',
default_args=default_args,
description='My DAG',
schedule_interval='00 03 * * *',
start_date=days_ago(1),
tags=['dev'],
)
task = AwsBatchOperator(
dag=dag,
job_name= 'my-job-name',
job_definition= 'arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name',
job_queue= 'arn:aws:batch:eu-central-1:XXXX:job-queue/my-job-name',
region_name= 'eu-central-1',
task_id= 'my-task-id',
overrides={
'command': ['python3', './my_python_script.py']
},
parameters= {}
)
python脚本my_python_script.py
在部署airflow的EC2机器上,目录/home/ubuntu
.
我在这个 python 脚本中出现了错误。我更正了它并将更正后的脚本推送到 EC2 机器上。但是,当我执行 DAG 时,我仍然收到由我更正的拼写错误引起的错误。所以这是我的问题:
如何刷新我的 DAG 以确保它使用我的 EC2 机器上存在的脚本版本?
我试过的
- 点击 Airflow 网页界面上的“刷新”按钮刷新 DAG
- 等待 airflow-scheduler 自动刷新 DAG
- 删除DAG等待刷新
- 使用命令
python -m compileall
在 EC2 机器上重新编译 python 脚本
要更新 AwsBatchOperator
任务的代码,您需要使用新代码而不是 EC2 机器上的代码更新 AWS 批处理作业使用的 docker 图像部署气流的地方
AwsBatchOperator
可以执行 AWS 批处理作业定义 docker 图像中存在的代码,但无法执行部署了 airflow 的 EC2 机器中存在的代码。
当您使用 AwsBatchOperator
时,您可以设置要使用的作业定义。就我而言,它是 arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name
。此作业定义包含将在其中执行命令的 docker 图像。参见 https://docs.aws.amazon.com/en_en/batch/latest/userguide/Batch_GetStarted.html
我很困惑,因为它与 EC2 机器和 docker 图像上存在的代码库相同。因此,无需在部署了气流的 EC2 机器上更新代码,我只需使用新版本的代码创建一个新的 docker 图像,并让我的 AWS 批处理作业使用这个新图像。