在气流 dag 中包含时间
Include time in airflow dag
我是 Airflow 的新手,我想创建一个每小时运行一次的 dag,我们有一个导入文件的进程 importdata
,然后是一个向我们发送报告两次的进程 sendReport
每天早上 8 点和下午 6 点。
我怎样才能包括时间?
编辑: 我错过了你问题中你一天说两次的部分。请注意,您不能要求某事恰好在上午 8 点或下午 6 点发生,因为执行取决于系统资源。 DAG 实际上可能在 8:20
、08:32
等 运行ning 上......但是由于我们正在安排每小时的工作,我们知道 [=] 之间应该正好有一个 运行 15=] 到 9AM
所以我们将只验证是否满足时间范围,如果满足我们将执行 sendReport
。
更新代码:
import datetime
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.edgemodifier import Label
def time_in_range(start, end, x):
"""Return true if x is in the range [start, end]"""
if start <= end:
return start <= x <= end
else:
return start <= x or x <= end
def shortcircuit_fn():
return time_in_range(
datetime.time(8, 0, 0),
datetime.time(9, 0, 0),
datetime.datetime.now().time(),
) or time_in_range(
datetime.time(18, 0, 0),
datetime.time(19, 0, 0),
datetime.datetime.now().time(),
)
with DAG(
dag_id="with_short_circuit_twice_a_day",
schedule_interval='@hourly',
start_date=datetime.datetime(2021, 7, 17),
catchup=False
) as dag:
first_op = DummyOperator(task_id='importdata') # Replace with your operator
short_op = ShortCircuitOperator(task_id='short_circuit',
python_callable=shortcircuit_fn
)
send_op = DummyOperator(task_id='sendReport') # Replace with your operator
first_op >> short_op >> Label("8<time<9 or 18<time<19") >> send_op
超时执行window:
及时执行window:
上一个答案: 只有当您要在 所有 运行 秒之间发送报告时才相关15=] 和 6PM
有两种方法可以获得此功能。
您可以使用 BranchDateTimeOperator
检查作业 运行 是否在所需的时间范围内:
import datetime
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.utils.edgemodifier import Label
with DAG(
dag_id="with_branching",
schedule_interval='@hourly',
start_date=datetime.datetime(2021, 7, 17),
catchup=False
) as dag:
first_op = DummyOperator(task_id='importdata') # Replace with your operator
branch_op = BranchDateTimeOperator(
task_id='branch',
follow_task_ids_if_true='sendReport',
follow_task_ids_if_false='do_nothing',
target_upper=datetime.time(18, 0, 0),
target_lower=datetime.time(8, 0, 0),
)
in_range_op = DummyOperator(task_id='sendReport') # Replace with your operator
out_of_range_op = DummyOperator(task_id='do_nothing')
first_op >> branch_op >> Label("8<time<18") >> in_range_op
branch_op >> Label("rest of day") >> out_of_range_op
超时执行window:
及时执行window:
如果您需要在工作流的每个分支中实际执行不同的任务,这是一个很好的解决方案。如果不是这种情况,那么您可能应该在该解决方案中使用 ShortCircuitOperator
的第二个选项,只有在满足时间标准的情况下,工作流才会继续 sendReport
如果不满足标准,它将跳过:
import datetime
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.edgemodifier import Label
def time_in_range(start, end, x):
"""Return true if x is in the range [start, end]"""
if start <= end:
return start <= x <= end
else:
return start <= x or x <= end
def shortcircuit_fn():
return time_in_range(datetime.time(8, 0, 0),
datetime.time(18, 0, 0),
datetime.datetime.now().time(),
)
with DAG(
dag_id="with_short_circuit",
schedule_interval='@hourly',
start_date=datetime.datetime(2021, 7, 17),
catchup=False
) as dag:
first_op = DummyOperator(task_id='importdata') # Replace with your operator
short_op = ShortCircuitOperator(task_id='short_circuit',
python_callable=shortcircuit_fn
)
send_op = DummyOperator(task_id='sendReport') # Replace with your operator
first_op >> short_op >> Label("8<time<18") >> send_op
超时执行window:
及时执行window:
我是 Airflow 的新手,我想创建一个每小时运行一次的 dag,我们有一个导入文件的进程 importdata
,然后是一个向我们发送报告两次的进程 sendReport
每天早上 8 点和下午 6 点。
我怎样才能包括时间?
编辑: 我错过了你问题中你一天说两次的部分。请注意,您不能要求某事恰好在上午 8 点或下午 6 点发生,因为执行取决于系统资源。 DAG 实际上可能在 8:20
、08:32
等 运行ning 上......但是由于我们正在安排每小时的工作,我们知道 [=] 之间应该正好有一个 运行 15=] 到 9AM
所以我们将只验证是否满足时间范围,如果满足我们将执行 sendReport
。
更新代码:
import datetime
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.edgemodifier import Label
def time_in_range(start, end, x):
"""Return true if x is in the range [start, end]"""
if start <= end:
return start <= x <= end
else:
return start <= x or x <= end
def shortcircuit_fn():
return time_in_range(
datetime.time(8, 0, 0),
datetime.time(9, 0, 0),
datetime.datetime.now().time(),
) or time_in_range(
datetime.time(18, 0, 0),
datetime.time(19, 0, 0),
datetime.datetime.now().time(),
)
with DAG(
dag_id="with_short_circuit_twice_a_day",
schedule_interval='@hourly',
start_date=datetime.datetime(2021, 7, 17),
catchup=False
) as dag:
first_op = DummyOperator(task_id='importdata') # Replace with your operator
short_op = ShortCircuitOperator(task_id='short_circuit',
python_callable=shortcircuit_fn
)
send_op = DummyOperator(task_id='sendReport') # Replace with your operator
first_op >> short_op >> Label("8<time<9 or 18<time<19") >> send_op
超时执行window:
及时执行window:
上一个答案: 只有当您要在 所有 运行 秒之间发送报告时才相关15=] 和 6PM
有两种方法可以获得此功能。
您可以使用 BranchDateTimeOperator
检查作业 运行 是否在所需的时间范围内:
import datetime
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.utils.edgemodifier import Label
with DAG(
dag_id="with_branching",
schedule_interval='@hourly',
start_date=datetime.datetime(2021, 7, 17),
catchup=False
) as dag:
first_op = DummyOperator(task_id='importdata') # Replace with your operator
branch_op = BranchDateTimeOperator(
task_id='branch',
follow_task_ids_if_true='sendReport',
follow_task_ids_if_false='do_nothing',
target_upper=datetime.time(18, 0, 0),
target_lower=datetime.time(8, 0, 0),
)
in_range_op = DummyOperator(task_id='sendReport') # Replace with your operator
out_of_range_op = DummyOperator(task_id='do_nothing')
first_op >> branch_op >> Label("8<time<18") >> in_range_op
branch_op >> Label("rest of day") >> out_of_range_op
超时执行window:
及时执行window:
如果您需要在工作流的每个分支中实际执行不同的任务,这是一个很好的解决方案。如果不是这种情况,那么您可能应该在该解决方案中使用 ShortCircuitOperator
的第二个选项,只有在满足时间标准的情况下,工作流才会继续 sendReport
如果不满足标准,它将跳过:
import datetime
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.edgemodifier import Label
def time_in_range(start, end, x):
"""Return true if x is in the range [start, end]"""
if start <= end:
return start <= x <= end
else:
return start <= x or x <= end
def shortcircuit_fn():
return time_in_range(datetime.time(8, 0, 0),
datetime.time(18, 0, 0),
datetime.datetime.now().time(),
)
with DAG(
dag_id="with_short_circuit",
schedule_interval='@hourly',
start_date=datetime.datetime(2021, 7, 17),
catchup=False
) as dag:
first_op = DummyOperator(task_id='importdata') # Replace with your operator
short_op = ShortCircuitOperator(task_id='short_circuit',
python_callable=shortcircuit_fn
)
send_op = DummyOperator(task_id='sendReport') # Replace with your operator
first_op >> short_op >> Label("8<time<18") >> send_op
超时执行window:
及时执行window: