显式跳过 DAG 时松弛通知
Slack Notification when the DAG is Skipped Explicitly
在 Airflow 中,我知道您可以在 on_success_callback 和 on_failure_callback 上自动发送松弛通知,这在我的例子中已经正常工作。
在我的用例中,我有一个 ETL,如果当前数据为空且工作正常,它会引发 AirflowSkipException。但这会向我的 slack
发送成功通知
我想知道是否有 on_skip_callback 之类的东西,或者有什么方法可以发送通知,告知我的 DAG 在当天被跳过了。
任何帮助都是 appreciated.Thanks
编辑:为我的 ETL 添加了代码参考。数据点是从数据库中获取的,它每天都在变化,有时如果没有要处理的数据,那么数据点将是空的,反之亦然。
def ETL_function():
# Retrieve data code
....
# Validation to check if ETL data is empty
if not datapoints:
print("OUTPUT LOG : ETL Data not found/empty")
print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
raise AirflowSkipException
# return False
else :
print("OUTPUT LOG : ETL Data found")
print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
# return True
# ETL Process code
....
ETL_function_Task = PythonOperator(
task_id='ETL_function',
provide_context=True,
python_callable=fleet_behavior_idling,
on_success_callback=task_success_slack_alert,
dag=dag,
)
嗨@Anindhito Irmandharu,
您可以使用派生自 PythonOperator 的 ShortCircuitOperator
来达到此目的。
def ETL_function():
...
# Validation to check if ETL data is empty
if not datapoints:
print("OUTPUT LOG : ETL Data not found/empty")
print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
return False
else :
print("OUTPUT LOG : ETL Data found")
print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
return True
ETL_function_Task = ShortCircuitOperator(
task_id="ETL_function",
python_callable= ETL_function,
provide_context=True,
dag=dag,
)
ETL_function_Task >> downstream_Tasks
注意:您的下游任务将被跳过,但此任务 'ETL_function_Task' 将进入成功状态。我不确定为什么您需要为成功执行的任务发送松弛通知。
尽管您可以轻松更改
on_success_callback=task_success_slack_alert
根据您的要求。在你正在使用的slack_hook中写一个新的task_skipped_slack_alert
。
在 Airflow 中,我知道您可以在 on_success_callback 和 on_failure_callback 上自动发送松弛通知,这在我的例子中已经正常工作。
在我的用例中,我有一个 ETL,如果当前数据为空且工作正常,它会引发 AirflowSkipException。但这会向我的 slack
发送成功通知我想知道是否有 on_skip_callback 之类的东西,或者有什么方法可以发送通知,告知我的 DAG 在当天被跳过了。
任何帮助都是 appreciated.Thanks
编辑:为我的 ETL 添加了代码参考。数据点是从数据库中获取的,它每天都在变化,有时如果没有要处理的数据,那么数据点将是空的,反之亦然。
def ETL_function():
# Retrieve data code
....
# Validation to check if ETL data is empty
if not datapoints:
print("OUTPUT LOG : ETL Data not found/empty")
print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
raise AirflowSkipException
# return False
else :
print("OUTPUT LOG : ETL Data found")
print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
# return True
# ETL Process code
....
ETL_function_Task = PythonOperator(
task_id='ETL_function',
provide_context=True,
python_callable=fleet_behavior_idling,
on_success_callback=task_success_slack_alert,
dag=dag,
)
嗨@Anindhito Irmandharu,
您可以使用派生自 PythonOperator 的 ShortCircuitOperator
来达到此目的。
def ETL_function():
...
# Validation to check if ETL data is empty
if not datapoints:
print("OUTPUT LOG : ETL Data not found/empty")
print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
return False
else :
print("OUTPUT LOG : ETL Data found")
print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
return True
ETL_function_Task = ShortCircuitOperator(
task_id="ETL_function",
python_callable= ETL_function,
provide_context=True,
dag=dag,
)
ETL_function_Task >> downstream_Tasks
注意:您的下游任务将被跳过,但此任务 'ETL_function_Task' 将进入成功状态。我不确定为什么您需要为成功执行的任务发送松弛通知。 尽管您可以轻松更改
on_success_callback=task_success_slack_alert
根据您的要求。在你正在使用的slack_hook中写一个新的task_skipped_slack_alert
。