尝试使用 NifI 添加基于条件的字段
trying to add a field based on conditionals with NifI
我是 Apache NiFi 的新手,目前正在使用它来将消息数据路由到各个位置。我希望根据一组条件添加一些字段。
目前我有一个读取日志文件的 GetFile 处理器 ---> ExtractGrok 应用 grok 模式来解析 ---> ConvertRecord 从 Grok 转换为 Json。下一部分是我 stumped/not 确定下一步该做什么。
在我的 json 中,我有一个字段 refresh_time
我需要根据字段的某些条件创建 2 个新字段 refresh_time
类似于if refresh_time < 10 then cache = 1; else if refresh_time > 10 then reprocess = 1
这里的最终目标是可以在聚合中使用的数字字段 cache
和 refresh_time
。
根据条件添加 2 个数字字段的最佳方法是什么。是否有用于添加其他字段或更新记录以包含新字段的处理器?
谢谢。
有几种方法可以实现您想要的目标。
一个选项(更具可读性)
A QueryRecord 可以让您在记录中写一个 SQL 语句,然后根据结果拆分它们。例如
添加名为 cache
的动态 属性,其值为 SELECT * FROM FLOWFILE WHERE refresh_time < 10
。
添加名为 refresh
的动态 属性,其值为 SELECT * FROM FLOWFILE WHERE refresh_time > 10
。
QueryRecord 现在将具有关系 failure
、original
、cache
和 refresh
。
从 cache
和 refresh
分出一个 UpdateRecord,Replacement Value Strategy
设置为 Literal Value
。
对于 cache
关系,您可以添加名为 cache
的新动态 属性,其值为 1
。对于 refresh
关系,您可以添加名为 refresh
的新动态 属性,其值为 1
.
类似选项(可能性能更高)
如果你想避免额外的 UpdateRecord,你可以在 QueryRecord 中添加像这样的字段:
两个动态属性设置为:
cache
= SELECT *, 1 AS cache FROM FLOWFILE WHERE REFRESH < 10
reprocess
= SELECT *, 1 AS reprocess FROM FLOWFILE WHERE REFRESH > 10
由于磁盘读取次数较少,此选项的性能可能更高。
This gist is an example of the second option, you can import it to NiFi 试试看。
此外,仅供参考,您可以在 ConvertRecord 中使用 GrokReader 直接使用 Grok 解析到 JSON,可能会跳过 ExtractGrok。
我是 Apache NiFi 的新手,目前正在使用它来将消息数据路由到各个位置。我希望根据一组条件添加一些字段。
目前我有一个读取日志文件的 GetFile 处理器 ---> ExtractGrok 应用 grok 模式来解析 ---> ConvertRecord 从 Grok 转换为 Json。下一部分是我 stumped/not 确定下一步该做什么。
在我的 json 中,我有一个字段 refresh_time
我需要根据字段的某些条件创建 2 个新字段 refresh_time
类似于if refresh_time < 10 then cache = 1; else if refresh_time > 10 then reprocess = 1
这里的最终目标是可以在聚合中使用的数字字段 cache
和 refresh_time
。
根据条件添加 2 个数字字段的最佳方法是什么。是否有用于添加其他字段或更新记录以包含新字段的处理器?
谢谢。
有几种方法可以实现您想要的目标。
一个选项(更具可读性)
A QueryRecord 可以让您在记录中写一个 SQL 语句,然后根据结果拆分它们。例如
添加名为 cache
的动态 属性,其值为 SELECT * FROM FLOWFILE WHERE refresh_time < 10
。
添加名为 refresh
的动态 属性,其值为 SELECT * FROM FLOWFILE WHERE refresh_time > 10
。
QueryRecord 现在将具有关系 failure
、original
、cache
和 refresh
。
从 cache
和 refresh
分出一个 UpdateRecord,Replacement Value Strategy
设置为 Literal Value
。
对于 cache
关系,您可以添加名为 cache
的新动态 属性,其值为 1
。对于 refresh
关系,您可以添加名为 refresh
的新动态 属性,其值为 1
.
类似选项(可能性能更高)
如果你想避免额外的 UpdateRecord,你可以在 QueryRecord 中添加像这样的字段:
两个动态属性设置为:
cache
= SELECT *, 1 AS cache FROM FLOWFILE WHERE REFRESH < 10
reprocess
= SELECT *, 1 AS reprocess FROM FLOWFILE WHERE REFRESH > 10
由于磁盘读取次数较少,此选项的性能可能更高。
This gist is an example of the second option, you can import it to NiFi 试试看。
此外,仅供参考,您可以在 ConvertRecord 中使用 GrokReader 直接使用 Grok 解析到 JSON,可能会跳过 ExtractGrok。