NiFi:在流文件中的所有记录前添加(或附加)行号

NiFi : Prepend (orAppend) a line number to all records in a flowfile

我正在使用 NiFi 1.9.2

我正在阅读一个恰好是 csv 文件的文本文件。我在流文件的内容中有文件的内容。

内容是

a,b,c
d,e,f
g,h,i

我想为流文件中的所有记录添加一个行号并得到

1,a,b,c
2,d,e,f
3,g,h,i

每次我通过这个处理器输入文件时

我可以通过使用具有以下属性的 ReplaceText 处理器来实现接近的效果:

Search Value : (?m)(^.*$)
Replacement Value : ${nextInt()},

但是因为 nextInt() 在我得到的 运行 NiFi 实例的生命周期内持续存在它的价值

    0,a,b,c
    1,d,e,f
    2,g,h,i
for 1st execution

    3,a,b,c
    4,d,e,f
    5,g,h,i
for the next execution etc

此外,根据 NiFi 表达式语言指南,“计数器在所有 NiFi 组件之间共享,因此从一个处理器多次调用此函数将不能保证特定处理器上下文中的顺序值。”。 =14=]

  1. 有没有办法确保在 NiFi 实例的生命周期内每次执行此处理器的行号始终从 0 开始,并且始终是连续的?

  2. 计数器的范围是多少?

  3. 我可以让计数器从 1 开始吗?

您可以将内容拆分为多行,然后使用 fragment.index 为行添加计数器。之后你可以再次合并它们。

流程:

生成流文件:

拆分文本:

替换文字:

合并内容: 不要忘记向 Demarcator 属性添加新行 (Shift+Enter)。

结果:

如果要从零开始计数,可以使用${Fragment.index:minus(1)}