具有 2 个传入边的处理器 - 当在一个边上返回 false 时,保持从同一边重新处理并且从不在另一边上处理新项目
Processor with 2 incoming edges - When returning false on one edge, keep re-processing from same edge and never process new items on other edge
我要求确认我对 tryProcess() 逻辑的假设。
详细说明 return 值 (true/false) 如何影响具有 2 个未指定优先级的传入边缘的处理器上的 DAG 工作流。
我的假设是,如果处理器在两个边上都有传入项,并且一个 tryProcess() return 为 false,则将处理另一个边(如果在该边上有更多传入项可用)。
根据哪个边停止处理和接受它们来交替传入项目。
问题
有时会发生一个处理器实例阻塞在总是 returns false 的 tryProcess(#0) 上(因为我们希望从其他边缘处理新项目)。
tryProcess(#0) 被重复调用,而 tryProcess(#1) 从未被调用。
我确定 completeEdge() 从未在处理器上调用过 #0 和 #1 边缘,所以我希望从边缘 #1 开始处理更多项目。
这通常发生在多次 运行 相同的作业之后。
为了更好地解释问题,这是我的用例:
用例
我的数据模型由以下对象组成
- A:由 "ida" 属性识别的对象
- B:由 "idb" 属性标识的对象。它使用 "ida" 值
引用了 A
- AB:耦合B对象及其引用的A对象的对象
我需要将 B 对象与正确引用的 A 对象相匹配,并发出其中的几个。
我有一个采用此设置的 DAG:
顶点数
- S-A:类型为 "A" 的源项目(localParallelism(1),发出按 "ida" 属性排序的 A 对象)
- S-B:类型为 "B" 的源项(localParallelism(1),发出按引用的 "ida" 属性排序的 B 对象)
- C-AB: 将 B 对象与引用的 A 对象相匹配的处理器(发出 AB 对象)
连接数
- S-A -> C-AB:传入边 #0(未指定优先级,按 "ida" 属性分区)
- S-B -> C-AB:传入边 #1(未指定优先级,根据 "ida" 属性进行分区)
环境由具有 2 个节点的 hazelcast jet 集群组成。
逻辑
C-AB 处理器获取第一个 "A" 对象(来自边缘 #0)并保留它,直到处理完与该 "A" 对象相关的所有 "B" 对象。
如果它接收到另一个 "A" 对象,它会在 tryProcess(#0) 中 return false。
当它接收到 "B" 个匹配当前 "A" 的对象(来自边缘 #1)时,它会发出 "AB".
当处理器接收到带有下一个 "A" 对象引用的 "B" 对象时,它会丢弃当前的 "A" 并等待下一个。
如果它在引用 "A" 对象之前接收到 "B" 对象,等待正确的 "A" 匹配 tryProcess(#1) 中的 returning false if收到一个新的 "B"。
这应该可行,因为 S-A 和 S-B 发出正确排序的对象,并且边缘被正确分区以将具有相同 "ida" 值的对象发送到同一处理器。
My assumption is that if the processor have incoming items for both edges, and one tryProcess() return false, the other edge will be processed (if more incoming items are available on such edge).
这个假设是错误的。处理器的行为等同于
for (Object item : inbox) process(item);
但通过协作多线程实现,这就是为什么这个循环必须能够 "suspend" 本身。我们通过让 tryProcess()
return false
.
来实现暂停
执行引擎总是从中断处恢复处理器,并且在收件箱清空之前不会尝试处理任何其他项目。收件箱本身包含从输入队列中取出的一批项目,而不是边缘将在整个作业期间传输的所有项目。
Jet 提供的解决边之间相互依赖的唯一机制是边优先级。如果您需要比这更细粒度的东西,您的处理器应该接受所有项目并在内部缓冲它们,直到满足您的进度条件。
我要求确认我对 tryProcess() 逻辑的假设。
详细说明 return 值 (true/false) 如何影响具有 2 个未指定优先级的传入边缘的处理器上的 DAG 工作流。
我的假设是,如果处理器在两个边上都有传入项,并且一个 tryProcess() return 为 false,则将处理另一个边(如果在该边上有更多传入项可用)。 根据哪个边停止处理和接受它们来交替传入项目。
问题
有时会发生一个处理器实例阻塞在总是 returns false 的 tryProcess(#0) 上(因为我们希望从其他边缘处理新项目)。 tryProcess(#0) 被重复调用,而 tryProcess(#1) 从未被调用。 我确定 completeEdge() 从未在处理器上调用过 #0 和 #1 边缘,所以我希望从边缘 #1 开始处理更多项目。 这通常发生在多次 运行 相同的作业之后。
为了更好地解释问题,这是我的用例:
用例
我的数据模型由以下对象组成
- A:由 "ida" 属性识别的对象
- B:由 "idb" 属性标识的对象。它使用 "ida" 值 引用了 A
- AB:耦合B对象及其引用的A对象的对象
我需要将 B 对象与正确引用的 A 对象相匹配,并发出其中的几个。
我有一个采用此设置的 DAG:
顶点数
- S-A:类型为 "A" 的源项目(localParallelism(1),发出按 "ida" 属性排序的 A 对象)
- S-B:类型为 "B" 的源项(localParallelism(1),发出按引用的 "ida" 属性排序的 B 对象)
- C-AB: 将 B 对象与引用的 A 对象相匹配的处理器(发出 AB 对象)
连接数
- S-A -> C-AB:传入边 #0(未指定优先级,按 "ida" 属性分区)
- S-B -> C-AB:传入边 #1(未指定优先级,根据 "ida" 属性进行分区)
环境由具有 2 个节点的 hazelcast jet 集群组成。
逻辑
C-AB 处理器获取第一个 "A" 对象(来自边缘 #0)并保留它,直到处理完与该 "A" 对象相关的所有 "B" 对象。 如果它接收到另一个 "A" 对象,它会在 tryProcess(#0) 中 return false。
当它接收到 "B" 个匹配当前 "A" 的对象(来自边缘 #1)时,它会发出 "AB".
当处理器接收到带有下一个 "A" 对象引用的 "B" 对象时,它会丢弃当前的 "A" 并等待下一个。
如果它在引用 "A" 对象之前接收到 "B" 对象,等待正确的 "A" 匹配 tryProcess(#1) 中的 returning false if收到一个新的 "B"。
这应该可行,因为 S-A 和 S-B 发出正确排序的对象,并且边缘被正确分区以将具有相同 "ida" 值的对象发送到同一处理器。
My assumption is that if the processor have incoming items for both edges, and one tryProcess() return false, the other edge will be processed (if more incoming items are available on such edge).
这个假设是错误的。处理器的行为等同于
for (Object item : inbox) process(item);
但通过协作多线程实现,这就是为什么这个循环必须能够 "suspend" 本身。我们通过让 tryProcess()
return false
.
执行引擎总是从中断处恢复处理器,并且在收件箱清空之前不会尝试处理任何其他项目。收件箱本身包含从输入队列中取出的一批项目,而不是边缘将在整个作业期间传输的所有项目。
Jet 提供的解决边之间相互依赖的唯一机制是边优先级。如果您需要比这更细粒度的东西,您的处理器应该接受所有项目并在内部缓冲它们,直到满足您的进度条件。