将流文件 attribute/content 读取到处理器 属性

Read flow file attribute/content to processor property

我想根据最后一个流文件的内容设置一个 属性 处理器。

示例:我使用处理器 GenerateFlowFile 实例化流文件,并使用自定义文本 ${now()} 作为创建流文件期间的当前时间戳。

我想要一个处理器(哪种与我无关)来读取流文件的内容(时间戳)到处理器的自定义属性 property_name。之后我希望能够通过 REST-API 查询处理器并从处理器读取 属性。

最初我以为我可以用 ExtractText 处理器来做到这一点,但它基于正则表达式提取文本并将其写回流文件,而我想将该信息保存在处理器中直到下一个流文件到了。

您应该使用 UpdateAttribute 处理器。 您可以阅读几种方法 - f.e。

你不能通过 NiFi 做到这一点。当处理器 运行 你不能更新它的配置。

也许您可以在 UpdateAttribute 上使用状态变量?

Stateful Usage

By selecting "store state locally" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be referenced as a stateful variable like so:

Dynamic Property key : theCount value : ${getStateValue("theCount"):plus(1)} This example will keep a count of the total number of FlowFiles that have passed through the processor. To use logic on top of State, simply use the "Advanced Usage" of UpdateAttribute. All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the flow so far. This would be done by having a condition of "${getStateValue("maxValue"):lt(${value})}" and an action of attribute:"maxValue", value:"${value}". The "Stateful Variables Initial Value" property is used to initialize the stateful variables and is required to be set if running statefully. Some logic rules will require a very high initial value, like using the Advanced rules to determine the minimum value. If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind. For example, attempting to calculate the average of the incoming stream requires the sum and count. If all three properties are set in the same UpdateAttribute (like below) then the Average will always not include the most recent values of count and sum:

Count key : theCount value : ${getStateValue("theCount"):plus(1)} Sum> key : theSum value : ${getStateValue("theSum"):plus(${flowfileValue})} Average key : theAverage value : ${getStateValue("theSum"):divide(getStateValue("theCount"))} Instead, since average only relies on theCount and theSum attributes (which are added to the FlowFile as well) there should be a following Stateless UpdateAttribute which properly calculates the average. In the event that the processor is unable to get the state at the beginning of the onTrigger, the FlowFile will be pushed back to the originating relationship and the processor will yield. If the processor is able to get the state at the beginning of the onTrigger but unable to set the state after adding attributes to the FlowFile, the FlowFile will be transferred to "set state fail". This is normally due to the state not being the most up to date version (another thread has replaced the state with another version). In most use-cases this relationship should loop back to the processor since the only affected attributes will be overwritten. Note: Currently the only "stateful" option is to store state locally. This is done because the current implementation of clustered state relies on Zookeeper and Zookeeper isn't designed for the type of load/throughput UpdateAttribute with state would demand. In the future, if/when multiple different clustered state options are added, UpdateAttribute will be updated.

感谢@Ivan,我能够创建一个完整的工作解决方案 - 以供将来参考:

  1. 实例化流程文件,例如GenerateFlowFile 处理器并添加自定义 属性“my属性”和值 ${now()}(注意:您可以将此 属性 添加到任何处理器中的流文件, 不必是 GenerateFlowFile 处理器)

  2. 有一个 UpdateAttribute 处理器,其选项(在处理器属性下)存储状态 设置为 Store state locally

  3. UpdateAttribute处理器中添加自定义属性名称为readable_property并将其设置为值${'myproperty'}.

处理器的状态现在包含最后一个流文件的值(例如,带有属性添加到流文件的时间戳)。

额外奖励:

  1. 通过 REST-API 和 URI 上的 GET /nifi-api/processors/{id}/state

返回的 JSON 包含以下行:

{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}

然后你只需要解析 JSON 的值。