Airflow S3KeySensor - 如何让它继续 运行
Airflow S3KeySensor - How to make it continue running
在 的帮助下,我刚刚制作了一个程序(post 中显示的程序),当文件被放置在 S3 存储桶中时,我的 [=34] 中的一个任务=]ning DAGs 被触发,然后我使用 BashOperator 执行一些工作。完成后,虽然 DAG 不再处于 运行ning 状态,而是进入成功状态,如果我想让它获取另一个文件,我需要清除所有 'Past'、'Future'、'Upstream'、'Downstream' activity。我想制作此程序,使其始终 运行ning,并且只要将新文件放入 S3 存储桶中,程序就会启动任务。
我可以继续使用 S3KeySenor 来执行此操作,还是我需要找出一种方法来设置 External Trigger 到 运行 我的 DAG?到目前为止,如果我的 S3KeySensor 只会 运行 一次,那么它就毫无意义了。
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 29),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')
# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
task_id='create_emr_cluster_1',
bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
retries=1,
dag=dag)
t1 = BashOperator(
task_id='success_log',
bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
dag=dag)
sensor = S3KeySensor(
task_id='new_s3_file_in_foobar-bucket',
bucket_key='*',
wildcard_match=True,
bucket_name='foobar-bucket',
s3_conn_id='s3://foobar-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
t2.set_upstream(t1)
我想知道这是否不可能,因为它不会是有向无环图,而是会有一个重复的循环 sensor -> t1 -> t2 -> sensor - > t1 -> t2 -> 传感器 -> ... 不断重复。
更新:
我的用例非常简单,只要将新文件放入指定的 AWS S3 存储桶中,我就希望触发 DAG 并开始执行各种任务。这些任务将执行一些操作,例如实例化一个新的 AWS EMR 集群、从 AWS S3 存储桶中提取文件、执行一些 AWS EMR 活动,然后关闭 AWS EMR 集群。从那里 DAG 将回到等待状态,等待新文件到达 AWS S3 存储桶,然后无限期地重复该过程。
在 Airflow 中,没有映射到始终 运行ning DAG 的概念。如果适合您的用例,您可以非常频繁地使用 DAG 运行,例如每 1 到 5 分钟一次。
这里的主要内容是 S3KeySensor 检查直到它检测到密钥的通配符路径中存在第一个文件(或超时),然后它 运行s。但是,当第二个、第三个或第四个文件着陆时,S3 传感器将已经为该 DAG 运行 完成 运行ning。在下一个 DAG 运行 之前,它不会再次安排到 运行。 (你描述的循环想法大致等同于调度程序在创建 DAG 运行s 时所做的事情,但不是永远。)
外部触发器听起来绝对是您用例的最佳方法,无论该触发器是通过 Airflow CLI 的 trigger_dag 命令 ($ airflow trigger_dag ...
):
或通过 REST API:
双方转身调用common中的trigger_dag
函数(实验)API:
例如,您可以设置一个 AWS Lambda 函数,当文件到达 S3 时调用,运行触发 DAG 调用。
另一种方法是使用 S3 触发 aws lambda,它将使用 api
调用 DAG
s3 事件 -> aws lambda -> 气流 api
设置 S3 通知以触发 lambda
https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
气流API
https://airflow.apache.org/docs/apache-airflow/stable/rest-api-ref.html
在
我可以继续使用 S3KeySenor 来执行此操作,还是我需要找出一种方法来设置 External Trigger 到 运行 我的 DAG?到目前为止,如果我的 S3KeySensor 只会 运行 一次,那么它就毫无意义了。
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 29),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')
# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
task_id='create_emr_cluster_1',
bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
retries=1,
dag=dag)
t1 = BashOperator(
task_id='success_log',
bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
dag=dag)
sensor = S3KeySensor(
task_id='new_s3_file_in_foobar-bucket',
bucket_key='*',
wildcard_match=True,
bucket_name='foobar-bucket',
s3_conn_id='s3://foobar-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
t2.set_upstream(t1)
我想知道这是否不可能,因为它不会是有向无环图,而是会有一个重复的循环 sensor -> t1 -> t2 -> sensor - > t1 -> t2 -> 传感器 -> ... 不断重复。
更新:
我的用例非常简单,只要将新文件放入指定的 AWS S3 存储桶中,我就希望触发 DAG 并开始执行各种任务。这些任务将执行一些操作,例如实例化一个新的 AWS EMR 集群、从 AWS S3 存储桶中提取文件、执行一些 AWS EMR 活动,然后关闭 AWS EMR 集群。从那里 DAG 将回到等待状态,等待新文件到达 AWS S3 存储桶,然后无限期地重复该过程。
在 Airflow 中,没有映射到始终 运行ning DAG 的概念。如果适合您的用例,您可以非常频繁地使用 DAG 运行,例如每 1 到 5 分钟一次。
这里的主要内容是 S3KeySensor 检查直到它检测到密钥的通配符路径中存在第一个文件(或超时),然后它 运行s。但是,当第二个、第三个或第四个文件着陆时,S3 传感器将已经为该 DAG 运行 完成 运行ning。在下一个 DAG 运行 之前,它不会再次安排到 运行。 (你描述的循环想法大致等同于调度程序在创建 DAG 运行s 时所做的事情,但不是永远。)
外部触发器听起来绝对是您用例的最佳方法,无论该触发器是通过 Airflow CLI 的 trigger_dag 命令 ($ airflow trigger_dag ...
):
或通过 REST API:
双方转身调用common中的trigger_dag
函数(实验)API:
例如,您可以设置一个 AWS Lambda 函数,当文件到达 S3 时调用,运行触发 DAG 调用。
另一种方法是使用 S3 触发 aws lambda,它将使用 api
调用 DAGs3 事件 -> aws lambda -> 气流 api
设置 S3 通知以触发 lambda
https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
气流API
https://airflow.apache.org/docs/apache-airflow/stable/rest-api-ref.html