运行 另一个带有 TriggerDag运行Operator 多次的 DAG
Run another DAG with TriggerDagRunOperator multiple times
我有一个 DAG (DAG1),我在其中复制了一堆文件。然后我想为每个复制的文件启动另一个 DAG (DAG2)。由于每个 DAG1 运行 复制的文件数量会有所不同,我想基本上循环文件并使用适当的参数调用 DAG2。
例如:
with DAG( 'DAG1',
description="copy files over",
schedule_interval="* * * * *",
max_active_runs=1
) as dag:
t_rsync = RsyncOperator( task_id='rsync_data',
source='/source/',
target='/destination/' )
t_trigger_preprocessing = TriggerDagRunOperator( task_id='trigger_preprocessing',
trigger_daq_id='DAG2',
python_callable=trigger
)
t_rsync >> t_trigger_preprocessing
我希望使用python_callable trigger
从t_rsync
中提取相关的xcom数据,然后触发DAG2;但我不清楚该怎么做。
我更愿意把调用DAG2的逻辑放在这里,简化DAG2的内容(同时提供堆叠原理图max_active_runs
)
最终编写了我自己的运算符:
class TriggerMultipleDagRunOperator(TriggerDagRunOperator):
def execute(self, context):
count = 0
for dro in self.python_callable(context):
if dro:
with create_session() as session:
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True)
session.add(dr)
session.commit()
count = count + 1
else:
self.log.info("Criteria not met, moving on")
if count == 0:
raise AirflowSkipException('No external dags triggered')
python_callable 喜欢
def trigger_preprocessing(context):
for base_filename,_ in found.items():
exp = context['ti'].xcom_pull( task_ids='parse_config', key='experiment')
run_id='%s__%s' % (exp['microscope'], datetime.utcnow().replace(microsecond=0).isoformat())
dro = DagRunOrder(run_id=run_id)
d = {
'directory': context['ti'].xcom_pull( task_ids='parse_config', key='experiment_directory'),
'base': base_filename,
'experiment': exp['name'],
}
LOG.info('triggering dag %s with %s' % (run_id,d))
dro.payload = d
yield dro
return
然后将它们全部绑在一起:
t_trigger_preprocessing = TriggerMultipleDagRunOperator( task_id='trigger_preprocessing',
trigger_dag_id='preprocessing',
python_callable=trigger_preprocessing
)
我有一个 DAG (DAG1),我在其中复制了一堆文件。然后我想为每个复制的文件启动另一个 DAG (DAG2)。由于每个 DAG1 运行 复制的文件数量会有所不同,我想基本上循环文件并使用适当的参数调用 DAG2。
例如:
with DAG( 'DAG1',
description="copy files over",
schedule_interval="* * * * *",
max_active_runs=1
) as dag:
t_rsync = RsyncOperator( task_id='rsync_data',
source='/source/',
target='/destination/' )
t_trigger_preprocessing = TriggerDagRunOperator( task_id='trigger_preprocessing',
trigger_daq_id='DAG2',
python_callable=trigger
)
t_rsync >> t_trigger_preprocessing
我希望使用python_callable trigger
从t_rsync
中提取相关的xcom数据,然后触发DAG2;但我不清楚该怎么做。
我更愿意把调用DAG2的逻辑放在这里,简化DAG2的内容(同时提供堆叠原理图max_active_runs
)
最终编写了我自己的运算符:
class TriggerMultipleDagRunOperator(TriggerDagRunOperator):
def execute(self, context):
count = 0
for dro in self.python_callable(context):
if dro:
with create_session() as session:
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True)
session.add(dr)
session.commit()
count = count + 1
else:
self.log.info("Criteria not met, moving on")
if count == 0:
raise AirflowSkipException('No external dags triggered')
python_callable 喜欢
def trigger_preprocessing(context):
for base_filename,_ in found.items():
exp = context['ti'].xcom_pull( task_ids='parse_config', key='experiment')
run_id='%s__%s' % (exp['microscope'], datetime.utcnow().replace(microsecond=0).isoformat())
dro = DagRunOrder(run_id=run_id)
d = {
'directory': context['ti'].xcom_pull( task_ids='parse_config', key='experiment_directory'),
'base': base_filename,
'experiment': exp['name'],
}
LOG.info('triggering dag %s with %s' % (run_id,d))
dro.payload = d
yield dro
return
然后将它们全部绑在一起:
t_trigger_preprocessing = TriggerMultipleDagRunOperator( task_id='trigger_preprocessing',
trigger_dag_id='preprocessing',
python_callable=trigger_preprocessing
)