如何使用 Azkaban Python API 添加流作为 Azkaban 作业的依赖项?

How to add a flow as a dependency for an Azkaban job using the Azkaban Python API?

我有一个工作 A 需要调用 Azkaban 流程 "F" 作为依赖项。如何提及作业 A 对流程 F 的依赖?

这是我现在拥有的与获取远程存储流相关的内容"F":

session = remote.Session("user@https://AZKABANURL")
workflows = session.get_workflows("FlowFProjectName")
flows = workflows[u"flows"]
flow_id = flows[0]["flowId"]

workflows = session.get_workflow_info("FlowFProjectName", flow_id)
node_id = workflows["nodes"][0]["id"]

现在我有了 node_id,它是流程 F 中最后一个作业的名称,我如何在作业 A 中添加流程 F 的依赖项?是这样吗?

jobs["A"] = {
    "type": "command", 
    "command": 'echo "Hello World"', 
    "dependencies": "F"
}

执行以下操作会导致我在上传到 Azkaban 时出错(通过将此作业 A 捆绑在项目中):

jobs["a"] = Job({"type": "command", "command": 'echo "Hello World"',"dependencies": node_id})

这是错误:

azkaban.util.AzkabanError: Installation Failed.
Error found in upload. Cannot upload.
a cannot find dependency <node_id>

这里node_id是我隐藏的作业的真实名称

有人可以建议我在作业中添加这些对外部流程的依赖吗?外流在Azkaban(这就是为什么我要用Azkaban.remote)。

我找到了问题的答案:

  1. 调用远程流程并等待它完成(在 while 循环中)
  2. 使用使流 F 调用其依赖作业的启动。

选项 1:哪个更容易理解 - 您使用 while 循环不断询问 Azkaban 特定的 job/flow 是否仍然是 运行ning。但是在这样做的过程中,你应该保持你的 while 循环 运行ning 数小时和数小时 + 你检查流是否 运行ning 的方式正在使用 get_running_workflows() 方法。此方法不会 return 流的某个实例是否仍然是 运行ning,而只是 return 所述流的任何实例是否是 运行ning。

选项 2:如果流程 F 在作业 f 中结束并且作业 A 需要 运行 在流程 F 完成执行后,您将作业添加到流程 F 的末尾,比如 f' 所以f' 将调用作业 A。

如果这很难理解:

原始作业图: 作业 A 所依赖的流程 F: f1 -> f2 -> ... f

添加启动作业后: 流量 f': f1 -> f2 -> ... f -> f'

在这里,f' 应包括 session.run_workflow(project_A, flow_A)

这是比选项 1 更好的方法,因为您可以肯定地知道作业 A 只会在流程 f 成功执行后才会启动。 我希望这对以后的人有所帮助。