尝试使用 NifI 添加基于条件的字段

trying to add a field based on conditionals with NifI

我是 Apache NiFi 的新手,目前正在使用它来将消息数据路由到各个位置。我希望根据一组条件添加一些字段。

目前我有一个读取日志文件的 GetFile 处理器 ---> ExtractGrok 应用 grok 模式来解析 ---> ConvertRecord 从 Grok 转换为 Json。下一部分是我 stumped/not 确定下一步该做什么。

在我的 json 中,我有一个字段 refresh_time 我需要根据字段的某些条件创建 2 个新字段 refresh_time

类似于if refresh_time < 10 then cache = 1; else if refresh_time > 10 then reprocess = 1

这里的最终目标是可以在聚合中使用的数字字段 cacherefresh_time

根据条件添加 2 个数字字段的最佳方法是什么。是否有用于添加其他字段或更新记录以包含新字段的处理器?

谢谢。

有几种方法可以实现您想要的目标。

一个选项(更具可读性)

A QueryRecord 可以让您在记录中写一个 SQL 语句,然后根据结果拆分它们。例如

添加名为 cache 的动态 属性,其值为 SELECT * FROM FLOWFILE WHERE refresh_time < 10

添加名为 refresh 的动态 属性,其值为 SELECT * FROM FLOWFILE WHERE refresh_time > 10

QueryRecord 现在将具有关系 failureoriginalcacherefresh

cacherefresh 分出一个 UpdateRecordReplacement Value Strategy 设置为 Literal Value

对于 cache 关系,您可以添加名为 cache 的新动态 属性,其值为 1。对于 refresh 关系,您可以添加名为 refresh 的新动态 属性,其值为 1.

类似选项(可能性能更高)

如果你想避免额外的 UpdateRecord,你可以在 QueryRecord 中添加像这样的字段:

两个动态属性设置为:

cache = SELECT *, 1 AS cache FROM FLOWFILE WHERE REFRESH < 10

reprocess = SELECT *, 1 AS reprocess FROM FLOWFILE WHERE REFRESH > 10

由于磁盘读取次数较少,此选项的性能可能更高。

This gist is an example of the second option, you can import it to NiFi 试试看。

此外,仅供参考,您可以在 ConvertRecord 中使用 GrokReader 直接使用 Grok 解析到 JSON,可能会跳过 ExtractGrok。