将流文件 attribute/content 读取到处理器 属性
Read flow file attribute/content to processor property
我想根据最后一个流文件的内容设置一个 属性 处理器。
示例:我使用处理器 GenerateFlowFile
实例化流文件,并使用自定义文本 ${now()}
作为创建流文件期间的当前时间戳。
我想要一个处理器(哪种与我无关)来读取流文件的内容(时间戳)到处理器的自定义属性 property_name
。之后我希望能够通过 REST-API 查询处理器并从处理器读取 属性。
最初我以为我可以用 ExtractText
处理器来做到这一点,但它基于正则表达式提取文本并将其写回流文件,而我想将该信息保存在处理器中直到下一个流文件到了。
您应该使用 UpdateAttribute 处理器。
您可以阅读几种方法 - f.e。
你不能通过 NiFi 做到这一点。当处理器 运行 你不能更新它的配置。
也许您可以在 UpdateAttribute 上使用状态变量?
Stateful Usage
By selecting "store state locally" option for the "Store State"
property UpdateAttribute will not only store the evaluated properties
as attributes of the FlowFile but also as stateful variables to be
referenced in a recursive fashion. This enables the processor to
calculate things like the sum or count of incoming FlowFiles. A
dynamic property can be referenced as a stateful variable like so:
Dynamic Property key : theCount value :
${getStateValue("theCount"):plus(1)}
This example will keep a count of
the total number of FlowFiles that have passed through the processor.
To use logic on top of State, simply use the "Advanced Usage" of
UpdateAttribute. All Actions will be stored as stateful attributes as
well as being added to FlowFiles. Using the "Advanced Usage" it is
possible to keep track of things like a maximum value of the flow so
far. This would be done by having a condition of
"${getStateValue("maxValue"):lt(${value})}"
and an action of
attribute:"maxValue", value:"${value}". The "Stateful Variables
Initial Value" property is used to initialize the stateful variables
and is required to be set if running statefully. Some logic rules will
require a very high initial value, like using the Advanced rules to
determine the minimum value. If stateful properties reference other
stateful properties then the value for the other stateful properties
will be an iteration behind. For example, attempting to calculate the
average of the incoming stream requires the sum and count. If all
three properties are set in the same UpdateAttribute (like below) then
the Average will always not include the most recent values of count
and sum:
Count key : theCount value : ${getStateValue("theCount"):plus(1)} Sum> key : theSum value : ${getStateValue("theSum"):plus(${flowfileValue})}
Average key : theAverage value :
${getStateValue("theSum"):divide(getStateValue("theCount"))}
Instead,
since average only relies on theCount and theSum attributes (which are
added to the FlowFile as well) there should be a following Stateless
UpdateAttribute which properly calculates the average. In the event
that the processor is unable to get the state at the beginning of the
onTrigger, the FlowFile will be pushed back to the originating
relationship and the processor will yield. If the processor is able to
get the state at the beginning of the onTrigger but unable to set the
state after adding attributes to the FlowFile, the FlowFile will be
transferred to "set state fail". This is normally due to the state not
being the most up to date version (another thread has replaced the
state with another version). In most use-cases this relationship
should loop back to the processor since the only affected attributes
will be overwritten. Note: Currently the only "stateful" option is to
store state locally. This is done because the current implementation
of clustered state relies on Zookeeper and Zookeeper isn't designed
for the type of load/throughput UpdateAttribute with state would
demand. In the future, if/when multiple different clustered state
options are added, UpdateAttribute will be updated.
感谢@Ivan,我能够创建一个完整的工作解决方案 - 以供将来参考:
实例化流程文件,例如GenerateFlowFile
处理器并添加自定义 属性“my属性”和值 ${now()}
(注意:您可以将此 属性 添加到任何处理器中的流文件, 不必是 GenerateFlowFile
处理器)
有一个 UpdateAttribute
处理器,其选项(在处理器属性下)存储状态 设置为 Store state locally
。
在UpdateAttribute
处理器中添加自定义属性名称为readable_property并将其设置为值${'myproperty'}
.
处理器的状态现在包含最后一个流文件的值(例如,带有属性添加到流文件的时间戳)。
额外奖励:
- 通过 REST-API 和 URI 上的 GET
/nifi-api/processors/{id}/state
返回的 JSON 包含以下行:
{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}
然后你只需要解析 JSON 的值。
我想根据最后一个流文件的内容设置一个 属性 处理器。
示例:我使用处理器 GenerateFlowFile
实例化流文件,并使用自定义文本 ${now()}
作为创建流文件期间的当前时间戳。
我想要一个处理器(哪种与我无关)来读取流文件的内容(时间戳)到处理器的自定义属性 property_name
。之后我希望能够通过 REST-API 查询处理器并从处理器读取 属性。
最初我以为我可以用 ExtractText
处理器来做到这一点,但它基于正则表达式提取文本并将其写回流文件,而我想将该信息保存在处理器中直到下一个流文件到了。
您应该使用 UpdateAttribute 处理器。
您可以阅读几种方法 - f.e。
你不能通过 NiFi 做到这一点。当处理器 运行 你不能更新它的配置。
也许您可以在 UpdateAttribute 上使用状态变量?
Stateful Usage
By selecting "store state locally" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be referenced as a stateful variable like so:
Dynamic Property key : theCount value :
${getStateValue("theCount"):plus(1)}
This example will keep a count of the total number of FlowFiles that have passed through the processor. To use logic on top of State, simply use the "Advanced Usage" of UpdateAttribute. All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the flow so far. This would be done by having a condition of"${getStateValue("maxValue"):lt(${value})}"
and an action of attribute:"maxValue", value:"${value}". The "Stateful Variables Initial Value" property is used to initialize the stateful variables and is required to be set if running statefully. Some logic rules will require a very high initial value, like using the Advanced rules to determine the minimum value. If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind. For example, attempting to calculate the average of the incoming stream requires the sum and count. If all three properties are set in the same UpdateAttribute (like below) then the Average will always not include the most recent values of count and sum:Count key : theCount value :
${getStateValue("theCount"):plus(1)} Sum> key : theSum value : ${getStateValue("theSum"):plus(${flowfileValue})}
Average key : theAverage value :${getStateValue("theSum"):divide(getStateValue("theCount"))}
Instead, since average only relies on theCount and theSum attributes (which are added to the FlowFile as well) there should be a following Stateless UpdateAttribute which properly calculates the average. In the event that the processor is unable to get the state at the beginning of the onTrigger, the FlowFile will be pushed back to the originating relationship and the processor will yield. If the processor is able to get the state at the beginning of the onTrigger but unable to set the state after adding attributes to the FlowFile, the FlowFile will be transferred to "set state fail". This is normally due to the state not being the most up to date version (another thread has replaced the state with another version). In most use-cases this relationship should loop back to the processor since the only affected attributes will be overwritten. Note: Currently the only "stateful" option is to store state locally. This is done because the current implementation of clustered state relies on Zookeeper and Zookeeper isn't designed for the type of load/throughput UpdateAttribute with state would demand. In the future, if/when multiple different clustered state options are added, UpdateAttribute will be updated.
感谢@Ivan,我能够创建一个完整的工作解决方案 - 以供将来参考:
实例化流程文件,例如
GenerateFlowFile
处理器并添加自定义 属性“my属性”和值${now()}
(注意:您可以将此 属性 添加到任何处理器中的流文件, 不必是GenerateFlowFile
处理器)有一个
UpdateAttribute
处理器,其选项(在处理器属性下)存储状态 设置为Store state locally
。在
UpdateAttribute
处理器中添加自定义属性名称为readable_property并将其设置为值${'myproperty'}
.
处理器的状态现在包含最后一个流文件的值(例如,带有属性添加到流文件的时间戳)。
额外奖励:
- 通过 REST-API 和 URI 上的 GET
/nifi-api/processors/{id}/state
返回的 JSON 包含以下行:
{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}
然后你只需要解析 JSON 的值。