提取 CSV 列的值以添加属性

Extract Value of CSV Column to Add Attribute

我正在 NiFi 中处理一些 CSV,我的管道正在生成一些副本。因此,我想使用 DetectDuplicate 处理器,但为了做到这一点,我需要有一些属性可以与之比较以检测重复。我有一个 ExtractText 处理器,我想使用正则表达式来获取 SHA1_BASE16 列中的值。

我在下面的 CSV 文件中尝试了以下正则表达式字符串(朋友建议的,我并不完全理解),但突出显示了不正确的字段和一些无关的内容。我怎样才能让它捕获 SHA1_BASE16 值?

正则表达式

^[^,]*,[^,]*,[^,]*,[^,]*,[^,]*,([^,]*),[^,]*,[^,]*,[^,]*,[^,]*,[^,]*,([^,]*),[^,]*,[^,]*,([^,]*)\S*

CSV

"USER_JID","CREATED_AT","UPLOAD_TIME","SHA1_BASE32","SHA1_BASE16","HASH_SOURCE","MESSAGE_TYPE","IPV4"
"dreynolds","1932/04/01 20:23:35 UTC","2016/12/28 20:23:11 UTC","72F20077A79A0D4D90F4C0669FB6EA4BC5953293","FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0","HOLLYWOOD","TWITTER","123.123.123.123"

实际输出

Match 1
Full match  0-291   "USER_JID","CREATED_AT","UPLOAD_TIME","SHA1_BASE32","SHA1_BASE16","HASH_SOURCE","MESSAGE_TYPE","IPV4...
Group 1.    66-79   "HASH_SOURCE"
Group 2.    209-251 "FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0"
Group 3.    274-291 "123.123.123.123"

预期输出

Match 1
Full match  0-291   "USER_JID","CREATED_AT","UPLOAD_TIME","SHA1_BASE32","SHA1_BASE16","HASH_SOURCE","MESSAGE_TYPE","IPV4...
Group 1.    209-251 "FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0"

我猜我们会在这里有两个 40 个字符的字符串,我们会使用第一个作为左边界,并应用这个简单的表达式:

.+"[A-Z0-9]{40}",("[A-Z0-9]{40}").+

我们想要的输出在这个捕获组中的位置:

("[A-Z0-9]{40}")

我们可以使用 </code>.</p> <h3><a href="https://regex101.com/r/jXAe8T/1/" rel="nofollow noreferrer">Demo</a></h3> <h3>测试</h3> <p><div class="snippet" data-lang="js" data-hide="false" data-console="true" data-babel="false"> <div class="snippet-code"> <pre><code>const regex = /.+"[A-Z0-9]{40}",("[A-Z0-9]{40}").+/gm; const str = `"dreynolds","1932/04/01 20:23:35 UTC","2016/12/28 20:23:11 UTC","72F20077A79A0D4D90F4C0669FB6EA4BC5953293","FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0","HOLLYWOOD","TWITTER","123.123.123.123"`; let m; while ((m = regex.exec(str)) !== null) { // This is necessary to avoid infinite loops with zero-width matches if (m.index === regex.lastIndex) { regex.lastIndex++; } // The result can be accessed through the `m`-variable. m.forEach((match, groupIndex) => { console.log(`Found match, group ${groupIndex}: ${match}`); }); }

正则表达式电路

jex.im 可视化正则表达式:

或者,您可以使用 PartitionRecord 将记录拆分为流文件,其中每条记录都具有相同的分区字段值(在本例中为 SHA1_BASE16)。它还会在流文件上为分区值设置一个属性,然后您可以在 DetectDuplicate 中使用该属性。

对于高基数字段(不会有很多重复的字段),您可能会看到性能下降,因为每个传出流文件中可能只有一行,因此对于大量行,您将获取大量流文件。话虽如此,而不是 DetectDuplicate 下游,你可以改为 RouteOnAttribute where record.count > 1。这消除了对 DistributedMapCache 的需要。

还有一个 contribution to add a DetectDuplicateRecord processor,我认为这是您在这里真正想要的。该贡献正在审查中,我希望将其纳入下一个 NiFi 版本。