NiFi 如何在 EnforceOrder 不适用时管理乱序?

NiFi how to manage out of order when EnforceOrder is not applicable?

我正在接收可能无序到达的实时数据,有些数据比其他数据晚到达。对于稍后的事件处理,我需要重新排序流文件流,我正在尝试查看在 NiFi 中是否可行。

我对 EnforceOrder 的理解是它需要一个增量整数作为流文件属性,并且知道起点。

由于我正在处理实时流,所以我没有起点,所以我不知道如何让它发挥作用。

我所拥有的是我可以提取的数据中的时间戳,转换为 linux 时间戳,然后写入“优先级”属性。 所以在那之后我用 Prioritizer = PriorityAttributePrioritizer 设置了一个队列。 但是流文件在队列中停留的时间不够长,无法解决乱序问题,因为之后的所有处理器都很快(我正在处理 1200 个流文件/秒)。

a) 有没有办法让队列充当缓冲区,仍然保持速度,只是延迟足够长,以便队列中有相当数量的流文件被优先处理?

b) 或者我们还能如何解决乱序问题?

我为此设计了一个服务员process group。在我的例子中,我不得不等待 10 秒来对我的流程进行排序。下面的进程组基本都是在一定时期内循环流动。

您也可以在 unmatched queue 上应用您的优先级,它应该足够大。

RouteOnAttribute

UpdateAttribute

我这里有一些东西看起来像是在低速下工作,但目前它在高速下崩溃了。

第一个块只是一个每秒生成流文件的模拟器。 UpdateAttribute 可以是生成用于排序的优先级属性的属性。

剩下的就是管爆了。

  • 为每个流文件增加一个计数器
  • 等待计数器达到极限
  • 如果流文件通过,则重置计数器

第一个通知为每个流文件将计数器递增 1。

Wait只有在计数器达到5时才会让flowfile通过 流文件使用PriorityAttributePrioritizer之前的队列,所有其他队列将设置为FIFO。

最后一个将计数器重置为零的通知

我找到了一个解决方案,有两种方法,要么等待特定时间释放一组流文件(解决方案2),要么等待多个流文件(解决方案1)。 我正在使用一种或另一种方法,而不是同时使用这两种方法。

在此处的图片中:flow,我已经在解决方案 1 和解决方案 2 下进行了描述。