气流动态生成的任务未按顺序 运行
Airflow dynamically genarated task not run in order
我创建了动态任务生成 dag。任务是准确生成的,但是这些任务不是按顺序触发的,不是一致地工作的。
我注意到它按字母数字顺序触发。
让我们检查 run_modification_ 个任务。我已经生成了 0 到 29 个任务。我注意到它会触发以下格式。
run_modification_0
run_modification_1
run_modification_10
run_modification_11
run_modification_12
run_modification_13
run_modification_14
run_modification_15
run_modification_16
run_modification_17
run_modification_18
run_modification_19
run_modification_2
run_modification_21
run_modification_23....
但我需要 运行 它在任务顺序上
run_modification_0
run_modification_1
run_modification_2
run_modification_3
run_modification_4
run_modification_5..
请帮我运行任务创建顺序上的那些任务。
from datetime import date, timedelta, datetime
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable
import os
args = {
'owner': 'Airflow',
'start_date': days_ago(2),
}
dag = DAG(
dag_id='tastOrder',
default_args=args,
schedule_interval=None,
tags=['task']
)
modification_processXcom = """ cd {{ ti.xcom_pull(task_ids=\'run_modification_\'+params.i, key=\'taskDateFolder\') }} """
def modificationProcess(ds,**kwargs):
today = datetime.strptime('2021-01-01', '%Y-%m-%d').date()
i = str(kwargs['i'])
newDate = today-timedelta(days=int(i))
print(str(newDate))
kwargs["ti"].xcom_push("taskDateFolder", str(newDate))
def getDays():
today = today = datetime.strptime('2021-01-01', '%Y-%m-%d').date()
yesterday = today - timedelta(days=30)
day_Diff = today-yesterday
return day_Diff,today
day_Diff, today = getDays()
for i in reversed(range(0,day_Diff.days)):
run_modification = PythonOperator(
task_id='run_modification_'+str(i),
provide_context=True,
python_callable=modificationProcess,
op_kwargs={'i': str(i)},
dag=dag,
)
modification_processXcom = BashOperator(
task_id='modification_processXcom_'+str(i),
bash_command=modification_processXcom,
params = {'i' :str(i)},
dag = dag
)
run_modification >> modification_processXcom
获取依赖为:
run_modification_1 -> modification_processXcom_1 ->
run_modification_2 -> modification_processXcom_2 -> ... - >
run_modification_29 -> modification_processXcom_29
你可以这样做:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
dag_id='my_dag',
schedule_interval=None,
start_date=datetime(2021, 8, 10),
catchup=False,
is_paused_upon_creation=False,
)
mylist1 = []
mylist2 = []
for i in range(1, 30):
mylist1.append(
BashOperator( # Replace with your requested operator
task_id=f'run_modification_{i}',
bash_command=f"""echo executing run_modification_{i}""",
dag=dag,
)
)
mylist2.append(
BashOperator( # Replace with your requested operator
task_id=f'modification_processXcom_{i}',
bash_command=f"""echo executing modification_processXcom_{i}""",
dag=dag,
)
)
if len(mylist1) > 0:
mylist1[-1] >> mylist2[-1] # This set dependency between run_modifiation to modification_processXcom
if len(mylist1) > 1:
mylist2[-2] >> mylist1[-1] # This set dependency between modification_processXcom to previous run_modifiation
此代码创建一个运算符列表,并将它们依次设置为 运行,如下所示:
树视图:
我创建了动态任务生成 dag。任务是准确生成的,但是这些任务不是按顺序触发的,不是一致地工作的。 我注意到它按字母数字顺序触发。 让我们检查 run_modification_ 个任务。我已经生成了 0 到 29 个任务。我注意到它会触发以下格式。 run_modification_0 run_modification_1 run_modification_10 run_modification_11 run_modification_12 run_modification_13 run_modification_14 run_modification_15 run_modification_16 run_modification_17 run_modification_18 run_modification_19 run_modification_2 run_modification_21 run_modification_23....
但我需要 运行 它在任务顺序上
run_modification_0
run_modification_1
run_modification_2
run_modification_3
run_modification_4
run_modification_5..
请帮我运行任务创建顺序上的那些任务。
from datetime import date, timedelta, datetime
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable
import os
args = {
'owner': 'Airflow',
'start_date': days_ago(2),
}
dag = DAG(
dag_id='tastOrder',
default_args=args,
schedule_interval=None,
tags=['task']
)
modification_processXcom = """ cd {{ ti.xcom_pull(task_ids=\'run_modification_\'+params.i, key=\'taskDateFolder\') }} """
def modificationProcess(ds,**kwargs):
today = datetime.strptime('2021-01-01', '%Y-%m-%d').date()
i = str(kwargs['i'])
newDate = today-timedelta(days=int(i))
print(str(newDate))
kwargs["ti"].xcom_push("taskDateFolder", str(newDate))
def getDays():
today = today = datetime.strptime('2021-01-01', '%Y-%m-%d').date()
yesterday = today - timedelta(days=30)
day_Diff = today-yesterday
return day_Diff,today
day_Diff, today = getDays()
for i in reversed(range(0,day_Diff.days)):
run_modification = PythonOperator(
task_id='run_modification_'+str(i),
provide_context=True,
python_callable=modificationProcess,
op_kwargs={'i': str(i)},
dag=dag,
)
modification_processXcom = BashOperator(
task_id='modification_processXcom_'+str(i),
bash_command=modification_processXcom,
params = {'i' :str(i)},
dag = dag
)
run_modification >> modification_processXcom
获取依赖为:
run_modification_1 -> modification_processXcom_1 ->
run_modification_2 -> modification_processXcom_2 -> ... - >
run_modification_29 -> modification_processXcom_29
你可以这样做:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
dag_id='my_dag',
schedule_interval=None,
start_date=datetime(2021, 8, 10),
catchup=False,
is_paused_upon_creation=False,
)
mylist1 = []
mylist2 = []
for i in range(1, 30):
mylist1.append(
BashOperator( # Replace with your requested operator
task_id=f'run_modification_{i}',
bash_command=f"""echo executing run_modification_{i}""",
dag=dag,
)
)
mylist2.append(
BashOperator( # Replace with your requested operator
task_id=f'modification_processXcom_{i}',
bash_command=f"""echo executing modification_processXcom_{i}""",
dag=dag,
)
)
if len(mylist1) > 0:
mylist1[-1] >> mylist2[-1] # This set dependency between run_modifiation to modification_processXcom
if len(mylist1) > 1:
mylist2[-2] >> mylist1[-1] # This set dependency between modification_processXcom to previous run_modifiation
此代码创建一个运算符列表,并将它们依次设置为 运行,如下所示:
树视图: