如何仅当新 partition/data 在 AWS athena table 中使用 python 中的 DAG 可用时才触发 Airflow 任务?
How to trigger a Airflow task only when new partition/data in avialable in the AWS athena table using DAG in python?
我有如下场景:
- 仅当源 table ( Athena) 中的新数据可用时才触发
Task 1
和 Task 2
。 Task1 和 Task2 的触发应该在一天中有一个新的数据分区时发生。
- 仅在
Task 1
和 Task 2
完成时触发 Task 3
- 触发
Task 4
只完成Task 3
我的代码
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
实现它的最佳方式是什么?
我相信你的问题解决了两个主要问题:
- 忘记以显式方式配置
schedule_interval
,所以 @daily 正在设置一些你不期望的东西。
- 当您依赖外部事件完成执行时,如何正确触发和重试 dag 的执行
简短的回答:明确设置你的 schedule_interval 使用 cron 作业格式并使用传感器操作员不时检查
default_args={
'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
....
poke_time= 60*5 #<---- set a poke_time in seconds
dag=dag)
其中 startime
是您的日常任务开始的时间,endtime
一天中您应该在标记为失败之前检查事件是否完成的最后时间是什么,poke_time
是您的 sensor_operator
将检查事件是否发生的时间间隔。
如何明确处理 cron 作业
每当您将 dag 设置为 @daily
时,就像您所做的那样:
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
从 docs,你可以看到你实际上在做:
@daily - Run once a day at midnight
现在可以理解为什么您会收到超时错误,并且会在 5 分钟后失败,因为您设置了 'retries': 1
和 'retry_delay': timedelta(minutes=5)
。所以它在午夜尝试 运行ning dag,但失败了。 5 分钟后再次重试并再次失败,因此它标记为失败。
所以基本上 @daily 运行 正在设置一个隐式的 cron 作业:
@daily -> Run once a day at midnight -> 0 0 * * *
cron 作业格式为以下格式,您可以随时将值设置为 *
"all"。
Minute Hour Day_of_Month Month Day_of_Week
所以@daily 基本上是说 运行 这个每:所有月的所有 days_of_month 分钟 0 小时 0 所有 days_of_week
所以您的情况是 运行 每隔:所有 days_of_week 的 all_months 的所有 days_of_month 的第 0 小时第 10 分钟。这在 cron 作业格式中转换为:
0 10 * * *
当依赖外部事件完成执行时,如何正确触发和重试dag的执行
您可以使用命令 airflow trigger_dag
从外部事件触发气流中的阻力。如果您可以通过某种方式触发 lambda 函数/python 脚本来定位您的气流实例,这将是可能的。
如果你不能从外部触发 dag,那么使用像 OP 那样的传感器运算符,给它设置一个 poke_time 并设置一个合理的高重试次数。
我有如下场景:
- 仅当源 table ( Athena) 中的新数据可用时才触发
Task 1
和Task 2
。 Task1 和 Task2 的触发应该在一天中有一个新的数据分区时发生。 - 仅在
Task 1
和Task 2
完成时触发 - 触发
Task 4
只完成Task 3
Task 3
我的代码
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
实现它的最佳方式是什么?
我相信你的问题解决了两个主要问题:
- 忘记以显式方式配置
schedule_interval
,所以 @daily 正在设置一些你不期望的东西。 - 当您依赖外部事件完成执行时,如何正确触发和重试 dag 的执行
简短的回答:明确设置你的 schedule_interval 使用 cron 作业格式并使用传感器操作员不时检查
default_args={
'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
....
poke_time= 60*5 #<---- set a poke_time in seconds
dag=dag)
其中 startime
是您的日常任务开始的时间,endtime
一天中您应该在标记为失败之前检查事件是否完成的最后时间是什么,poke_time
是您的 sensor_operator
将检查事件是否发生的时间间隔。
如何明确处理 cron 作业
每当您将 dag 设置为 @daily
时,就像您所做的那样:
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
从 docs,你可以看到你实际上在做:
@daily - Run once a day at midnight
现在可以理解为什么您会收到超时错误,并且会在 5 分钟后失败,因为您设置了 'retries': 1
和 'retry_delay': timedelta(minutes=5)
。所以它在午夜尝试 运行ning dag,但失败了。 5 分钟后再次重试并再次失败,因此它标记为失败。
所以基本上 @daily 运行 正在设置一个隐式的 cron 作业:
@daily -> Run once a day at midnight -> 0 0 * * *
cron 作业格式为以下格式,您可以随时将值设置为 *
"all"。
Minute Hour Day_of_Month Month Day_of_Week
所以@daily 基本上是说 运行 这个每:所有月的所有 days_of_month 分钟 0 小时 0 所有 days_of_week
所以您的情况是 运行 每隔:所有 days_of_week 的 all_months 的所有 days_of_month 的第 0 小时第 10 分钟。这在 cron 作业格式中转换为:
0 10 * * *
当依赖外部事件完成执行时,如何正确触发和重试dag的执行
您可以使用命令
airflow trigger_dag
从外部事件触发气流中的阻力。如果您可以通过某种方式触发 lambda 函数/python 脚本来定位您的气流实例,这将是可能的。如果你不能从外部触发 dag,那么使用像 OP 那样的传感器运算符,给它设置一个 poke_time 并设置一个合理的高重试次数。