NiFi 如何在 EnforceOrder 不适用时管理乱序?
NiFi how to manage out of order when EnforceOrder is not applicable?
我正在接收可能无序到达的实时数据,有些数据比其他数据晚到达。对于稍后的事件处理,我需要重新排序流文件流,我正在尝试查看在 NiFi 中是否可行。
我对 EnforceOrder 的理解是它需要一个增量整数作为流文件属性,并且知道起点。
由于我正在处理实时流,所以我没有起点,所以我不知道如何让它发挥作用。
我所拥有的是我可以提取的数据中的时间戳,转换为 linux 时间戳,然后写入“优先级”属性。
所以在那之后我用 Prioritizer = PriorityAttributePrioritizer 设置了一个队列。
但是流文件在队列中停留的时间不够长,无法解决乱序问题,因为之后的所有处理器都很快(我正在处理 1200 个流文件/秒)。
a) 有没有办法让队列充当缓冲区,仍然保持速度,只是延迟足够长,以便队列中有相当数量的流文件被优先处理?
b) 或者我们还能如何解决乱序问题?
我为此设计了一个服务员process group
。在我的例子中,我不得不等待 10 秒来对我的流程进行排序。下面的进程组基本都是在一定时期内循环流动。
您也可以在 unmatched queue
上应用您的优先级,它应该足够大。
RouteOnAttribute
UpdateAttribute
我这里有一些东西看起来像是在低速下工作,但目前它在高速下崩溃了。
第一个块只是一个每秒生成流文件的模拟器。
UpdateAttribute 可以是生成用于排序的优先级属性的属性。
剩下的就是管爆了。
- 为每个流文件增加一个计数器
- 等待计数器达到极限
- 如果流文件通过,则重置计数器
第一个通知为每个流文件将计数器递增 1。
Wait只有在计数器达到5时才会让flowfile通过
流文件使用PriorityAttributePrioritizer之前的队列,所有其他队列将设置为FIFO。
最后一个将计数器重置为零的通知
我找到了一个解决方案,有两种方法,要么等待特定时间释放一组流文件(解决方案2),要么等待多个流文件(解决方案1)。
我正在使用一种或另一种方法,而不是同时使用这两种方法。
在此处的图片中:flow,我已经在解决方案 1 和解决方案 2 下进行了描述。
我正在接收可能无序到达的实时数据,有些数据比其他数据晚到达。对于稍后的事件处理,我需要重新排序流文件流,我正在尝试查看在 NiFi 中是否可行。
我对 EnforceOrder 的理解是它需要一个增量整数作为流文件属性,并且知道起点。
由于我正在处理实时流,所以我没有起点,所以我不知道如何让它发挥作用。
我所拥有的是我可以提取的数据中的时间戳,转换为 linux 时间戳,然后写入“优先级”属性。 所以在那之后我用 Prioritizer = PriorityAttributePrioritizer 设置了一个队列。 但是流文件在队列中停留的时间不够长,无法解决乱序问题,因为之后的所有处理器都很快(我正在处理 1200 个流文件/秒)。
a) 有没有办法让队列充当缓冲区,仍然保持速度,只是延迟足够长,以便队列中有相当数量的流文件被优先处理?
b) 或者我们还能如何解决乱序问题?
我为此设计了一个服务员process group
。在我的例子中,我不得不等待 10 秒来对我的流程进行排序。下面的进程组基本都是在一定时期内循环流动。
您也可以在 unmatched queue
上应用您的优先级,它应该足够大。
RouteOnAttribute
UpdateAttribute
我这里有一些东西看起来像是在低速下工作,但目前它在高速下崩溃了。
第一个块只是一个每秒生成流文件的模拟器。 UpdateAttribute 可以是生成用于排序的优先级属性的属性。
剩下的就是管爆了。
- 为每个流文件增加一个计数器
- 等待计数器达到极限
- 如果流文件通过,则重置计数器
第一个通知为每个流文件将计数器递增 1。
Wait只有在计数器达到5时才会让flowfile通过 流文件使用PriorityAttributePrioritizer之前的队列,所有其他队列将设置为FIFO。
最后一个将计数器重置为零的通知
我找到了一个解决方案,有两种方法,要么等待特定时间释放一组流文件(解决方案2),要么等待多个流文件(解决方案1)。 我正在使用一种或另一种方法,而不是同时使用这两种方法。
在此处的图片中:flow,我已经在解决方案 1 和解决方案 2 下进行了描述。