第二个 condor_submit_dag 可以由第一个 DAG 中的节点执行吗?

Can a second condor_submit_dag be executed by a node within the first DAG?

我正在使用 Condor 以分布式方式执行大量处理任务。有两个处理阶段。在第一个处理阶段,我执行了 UMPTEEN 次工具来解析源数据的某些子集并将其转换为中间文件。该工具的每次执行都独立于所有其他工具。因此,这很适合用于 Condor。

要注意的是,该工具可能会决定不输出任何中间文件。因此,我无法先验知道我将拥有多少中间文件;数量可能少于 UMPTEEN。另一个问题是我不知道中间文件的名称是什么;我只有在工具创建文件后才知道文件名。

在第二个处理阶段,我执行其他工具将每个中间文件转换为其他不同格式的目标文件。我也想为此使用 Condor。但是,要为此编写提交描述文件,我需要确切地知道我必须转换多少个中间文件以及它们的文件名是什么。

我尝试的是在我的 stage1 DAG 中有一个 "generate_stage2" 节点,它取决于第一个节点的完成。在 "generate_stage2" 节点中,我 运行 一个 Python 脚本:

但是,提交第二个DAG失败了。我怀疑当我在第一个 DAG 中当前 运行ning 的节点中调用 condor_submit_dag 时,Condor 不喜欢它。

大问题

我的尝试可行吗?有没有办法让一个 DAG 触发另一个 DAG?

例子

以下是我的提交描述文件的示例,希望能解释我的尝试。

第 1 阶段 DAG

JOB 10_src_to_int      work/condor/10_src_to_int
JOB 20_generate_stage2 work/condor/20_generate_stage2

PARENT 10_src_to_int CHILD 20_generate_stage2

10_src_to_int

getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /bin/src_to_int

# UMPTEEN entries:
arguments = "src_data/ int_data/ --region -45 -123 -44 -122"
queue
arguments = "src_data/ int_data/ --region -46 -123 -45 -122"
queue
...

20_generate_stage2

getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /scripts/generate_stage2

arguments = "'data to share'  'between stage1'  'and stage2'"
queue

第 2 阶段 DAG

JOB 30_int_to_dst_a work/condor/30_int_to_abc
JOB 40_int_to_dst_b work/condor/40_int_to_xyz

30_int_to_abc

# Written by the generate_stage2 script which a node in the stage1 DAG executed.
getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /bin/int_to_abc

# At most UMPTEEN entries:
arguments = "int_data/S45_W123.int out_data/S45_W123.abc"
queue
arguments = "int_data/S46_W123.int out_data/S46_W123.abc"
queue
...

40_int_to_xyz

# Written by the generate_stage2 script which a node in the stage1 DAG executed.
getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /bin/int_to_xyz

# At most UMPTEEN entries:
arguments = "int_data/S45_W123.int out_data/S45_W123.xyz"
queue
arguments = "int_data/S46_W123.int out_data/S46_W123.xyz"
queue
...

(是的,我将源数据子集化为地理空间区域。在示例中,我使用了大约 45° S 123° W 左右的任意坐标,它位于海洋中间。它没有任何意义。)

你想要的是 dag 中的 subdag external 节点:

https://htcondor.readthedocs.io/en/latest/users-manual/dagman-workflows.html?highlight=subdag%20external#composing-workflows-from-multiple-dag-files

一个subdag external对于外部dag来说看起来像是一个普通的dag节点,但它是由运行另一个dagman进程实现的。这个内部 dag 在所有节点依赖项都得到满足之前不会启动,因此您可以让脚本根据外部 dag 的输出创建内部 dag。

我学会了如何用一个 DAG 完成我想要的东西。

我从来没有想过在提交 DAG 时不必为 DAG 中的所有节点编写提交文件。只要节点的提交文件写在节点运行s之前,它就可以工作。

我现在做的是让第一个节点执行生成中间文件的工具。然后,第二个节点执行 Python 脚本,该脚本搜索中间文件并为第三个和第四个节点写入提交文件。最后第三个和第四个节点运行成功

例子

以下是我修改后的提交描述文件的示例。

DAG

JOB 10_src_to_int work/condor/10_src_to_int
JOB 20_find_int   work/condor/20_find_int
JOB 30_int_to_abc work/condor/30_int_to_abc
JOB 40_int_to_xyz work/condor/40_int_to_xyz

PARENT 10_src_to_int CHILD 20_find_int
PARENT 20_find_int CHILD 30_int_to_abc
PARENT 20_find_int CHILD 40_int_to_xyz

10_src_to_int

getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /bin/src_to_int

# UMPTEEN entries:
arguments = "src_data/ int_data/ --region -45 -123 -44 -122"
queue
arguments = "src_data/ int_data/ --region -46 -123 -45 -122"
queue
...

20_find_int

getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /scripts/find_int

arguments = "'data needed'  'to find'  'intermediate files'"
queue

30_int_to_abc

# Written by the find_int script which the previous node executed.
getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /bin/int_to_abc

# At most UMPTEEN entries:
arguments = "int_data/S45_W123.int out_data/S45_W123.abc"
queue
arguments = "int_data/S46_W123.int out_data/S46_W123.abc"
queue
...

40_int_to_xyz

# Written by the find_int script which the previous node executed.
getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /bin/int_to_xyz

# At most UMPTEEN entries:
arguments = "int_data/S45_W123.int out_data/S45_W123.xyz"
queue
arguments = "int_data/S46_W123.int out_data/S46_W123.xyz"
queue
...