DAG source return false on emitFromTraverser 和处理器在开始处理之前等待源加载的所有元素

DAG source return false on emitFromTraverser and processor wait for all element loaded by source before start processing

用例

HazelcastJet 版本 0.6.1 Hazelcast 版本 3.10.2

给定这个(简化版)DAG

顶点数

S1 发出 5 个 A 类型项的源(从带分区的数据库中读取) 本地并行度 = 1

S2 发出 150K 类型 B 项的源(迭代器以 100 批次从 DB 中读取并进行分区) 本地并行度 = 1

AD 适应类型 A->A1 和 B->B1 并一个一个发出的处理器

FA Processors.filterP 只接受 A1 类型的项目并一个一个地发出

FB Processors.filterP 只接受 B1 类型的项目并一个一个地发出

CL 处理器首先累积所有类型 A1 的项目,然后当它收到类型 B1 的项目时,用从适当的 A1 获得的一些工作人员丰富它,然后一个接一个地发出。

WR 写入 B1 的接收器 本地并行度 = 1

注意: 只是为了给过滤器处理器赋予意义:在 DAG 中,还有其他来源流入同一适配器 AD,然后使用过滤器处理器进入其他路径。

边缘

S1 --> AD

S2 --> AD

AD --> FA(从序号 0 开始)

AD --> FB(从序数 1)

FA --> CL(分发和广播优先级为 0 的序号 0)

FB --> CL(优先级为 1 的序号 1)

CL --> WR

问题

如果源 S2 有 "few" 个要加载的项目(即 15K),emitFromTraverser 永远不会 returns false。

如果源 S2 有 "many" 个项目要加载(即 150K)emitFromTraverser returns false after:

S2代码供参考:

protected void init(Context context) throws Exception {
    super.init(context);
    this.iterator = new BQueryIterator(querySupplier, batchSize);
    this.traverser = Traversers.traverseIterator(this.iterator);
}

public boolean complete() {
    boolean result = emitFromTraverser(this.traverser);
    return result;
}

问题

更新

似乎从未调用过 CL 边 1 上的 completeEdge。 谁能告诉我为什么?

谢谢!

过了一会儿我明白了问题是什么...

CL 处理器无法知道何时处理完所有 A1 项目,因为所有项目都来自 AD 处理器。 所以在开始处理B1项之前需要等待所有来自AD的源。

不确定,但可能在加载了很多项 B 之后,DAG 中的所有收件箱缓冲区都变满了,无法接受来自 S2 的任何其他 B,但同时无法处理 B1 项以继续:就是这样死锁。

也许 DAG 能够检测到这一点? 我对 Jet 的了解不是很深,但如果能收到这个警告就好了。

也许可以启用一些日志记录?

希望有人能证实我的回答,并建议如何改进和检测这些问题。

您遇到了优先级导致的死锁。您的 DAG 从 AD 分支,然后在 CL 中重新加入,但具有优先级。

AD --+---- FA ----+-- CL
      \          /
       +-- FB --+

设置优先级会导致在处理来自较高优先级边缘的所有 项之前,不会处理来自较低优先级边缘的任何项目。 AD 最终会被来自较低优先级路径的背压阻塞,CL 未处理该路径。所以 AD 被阻塞是因为它不能发送到优先级较低的边缘,而 CL 被阻塞是因为它仍在等待来自较高优先级边缘的项目,从而导致死锁。

对于您的情况,您可以通过创建 2 个 AD 个顶点来解决它,每个顶点处理来自一个来源的项目:

S1 --- AD1 ----+--- CL
              /
S2 --- AD2 --+