如何在 Airflow 安装中创建触发进程?
How do you create a triggerer process in an Airflow installation?
在 Airflow DAG 中,我正在尝试使用 TimeDeltaTrigger:
from airflow.triggers.temporal import TimeDeltaTrigger
...
self.defer(trigger=TimeDeltaTrigger(timedelta(seconds=15)), method_name="execute")
但是当我的 DAG 运行时,我在 GUI 中收到警告:
在 GUI 中,如果我转到浏览 -> 触发器,我会看到一个触发器,但它不适用于 TimeDeltaTrigger
:
可延迟运算符 (https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html) 的文档说:
Ensure your Airflow installation is running at least one triggerer process, as well as the normal scheduler
但不清楚如何做到这一点。
如何配置我的 Airflow 安装以便我可以使用 TimeDeltaTrigger
?
triggerer
是一个类似于 scheduler
、webserver
和 worker
的过程。您需要启动一个专用于 运行 触发程序以使用可延迟运算符的进程或容器。
要启动触发进程,运行 airflow triggerer
在您的 Airflow 环境中。您应该会看到类似于下图的输出。
触发日志
在 Airflow DAG 中,我正在尝试使用 TimeDeltaTrigger:
from airflow.triggers.temporal import TimeDeltaTrigger
...
self.defer(trigger=TimeDeltaTrigger(timedelta(seconds=15)), method_name="execute")
但是当我的 DAG 运行时,我在 GUI 中收到警告:
在 GUI 中,如果我转到浏览 -> 触发器,我会看到一个触发器,但它不适用于 TimeDeltaTrigger
:
可延迟运算符 (https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html) 的文档说:
Ensure your Airflow installation is running at least one triggerer process, as well as the normal scheduler
但不清楚如何做到这一点。
如何配置我的 Airflow 安装以便我可以使用 TimeDeltaTrigger
?
triggerer
是一个类似于 scheduler
、webserver
和 worker
的过程。您需要启动一个专用于 运行 触发程序以使用可延迟运算符的进程或容器。
要启动触发进程,运行 airflow triggerer
在您的 Airflow 环境中。您应该会看到类似于下图的输出。
触发日志