基洛 |高水位线功能

Kylo | High Water Mark functionality

我每五分钟输入 运行 并使用 load/release 蜂巢水印功能。考虑一个场景,其中作业执行时间超过 5 分钟,水印提交没有发生。

在这种情况下,Kylo 会启动另一个带有旧水印的提要实例还是会等待提交发生?

我需要确认这一点,但我的理解是,新的高水位线只会在当前高水位线发布后 运行。请务必确保所有 'success' 和 'failure' 关系都以 ReleaseHighWaterMark 处理器结束。否则,您的 Feed 最终可能会处于不一致的状态,即尚未发布高水位线但无法加载新的高水位线。

如果水印处于活动状态(即流文件加载了水印并正在处理它,但尚未释放),将阻止试图再次加载相同水印的新流文件。它将等待活动水印被释放(通过提交或拒绝)。

您可以通过“Active Water Mark Strategy”属性 在“LoadHighWaterMark”上控制此行为处理器。这些可以帮助处理卡住或花费比预期更长的时间。如果策略设置为'Yield',如果水印处于活动状态,处理器将让步。 yield 发生的次数是通过处理器 属性 'Max Yield Count' 配置的。一旦达到此产量计数,处理器会将流文件路由到“ActiveFailure”关系。每次收益的持续时间可以通过处理器上的 设置 -> 收益持续时间 进行设置。如果策略设置为'Route',处理器将立即将流文件路由到'ActiveFailure'关系。

注意在叶子 SuccessFailureActiveFailure 关系。它支持两种模式 - commitreject.

回答您的具体问题:

如果您的 LoadHighWaterMark 处理器是您的 Feed 流中的第一个处理器(常见情况),那么当它在水位标记处于活动状态的 5 分钟后再次唤醒时,它将实际上什么都不做(在删除任何创建的流文件之后)并再等待 5 分钟以再次安排。处理器将首先屈服,但由于典型的屈服时间少于 5 分钟,因此屈服是无关紧要的。

如果您的 LoadHighWaterMark 处理器不是流中的第一个,则在 5 分钟唤醒后创建的流文件将为 re-queued 并且处理器将屈服或根据配置惩罚该流文件。每次 Feed 处理时间超过 5 分钟时都会发生这种情况。

所以你的问题的答案是no,新的处理不会用旧的water mark恢复,而是会等待当前的water mark 的 commit 或 release without commit。上面的两个案例假设一个关于响应活动水印的典型配置,并且您的提要表现正常,但处理特定批次的数据只需要超过 5 分钟。

请注意,如果您的 Feed 处理数据的平均时间超过 5 分钟,那么将 Feed 的时间表更改为高于 5 分钟的值是明智的,这样处理器的队列就不会备份。同样,只有当 LoadHighWaterMark 不是第一个处理器时才需要这样做。

一般行为:

如果流文件当前正在您的 Feed 的 NiFi 流部分内处理,由您的 LoadHighWaterMarkReleaseHighWaterMark 处理器限制,则否其他流文件可能会进入流的该部分,直到当前处理流文件通过 ReleaseHighWaterMark 处理器退出该部分;水印是否提交。您的流程序列被视为关键部分。这就是为什么每个流程路径,无论是成功路径还是失败路径,都必须通过某种 ReleaseHighWaterMark

现在,当您的提要唤醒并尝试在水位标记处于活动状态时处理新的流文件时,NiFi 流行为由 LoadHighWaterMark 在您的流中的位置决定及其配置。 Active Water Mark Strategy 设置会影响处理器在流文件到达且水印处于活动状态时的行为方式:

  • YIELD - 流程文件将被删除(如果是第一个处理器)或 re-queued 并且处理器将在其指定的屈服时间内屈服
  • PENALIZE - 流文件将被惩罚;导致它成为 re-queued
  • ROUTE - 流文件将立即路由到 activeFailure 关系

请注意,如果 LoadHighWaterMark 处理器是流程中的第一个处理器(不涉及队列),则 PENALIZE 没有意义。因此,将 Active Water Mark Strategy 设置为 PENALIZE 将被视为设置为 YIELD

影响行为的另一个配置设置是最大产量计数。此值指定在将流文件路由到 activeFailure 关系之前应尝试加载和处理活动水印的次数。因此,在您的场景中,如果最大计数设置为 3,并且当前的提要处理时间超过 20 分钟(5 分钟 X 4),那么从第四个开始的所有流文件将立即路由到 activeFailure 直到活动水印处理完成。届时,尝试计数将重置回 0,下一个到达的流文件将开始使用新的水位标记值进行处理。