获取 Nifi PutSQL 处理器的最后执行时间戳

Get last execution timestamp of a Nifi PutSQL processor

有没有办法通过 REST-API 获取 PutSQL 处理器最后一次执行的时间戳?这样的时间戳是否存在,或者我可以自己建立一个吗?

设置: 我有 Airflow 来触发我的 Nifi-ETL,它以几个 PutSQL 处理器结束 - 完成这些之后我需要在 Airflow 中执行其他操作。

思路:我想触发第一个Nifi处理器,然后在Airflow中等待,直到更新最后一个PutSQL处理器的last_execution_timestamp

问题: 我尝试访问属性 statsLastRefreshed,但这不是最后一次执行时间,而是最后一次(用户/api-请求)访问导致 Nifi 刷新处理器的处理器。

s = processor["status"]["statsLastRefreshed"]  # '13:13:26 CEST'

我在 Airflow 的 REST API 文档中找不到任何内容。

我看到的唯一其他选择是从 Airflow 向最后一个 PutSQL 处理器的数据库 table 发出请求,以查看那里是否发生了任何新情况。

Nifi 是为连续数据流而生的。所以 Processor 统计中没有最后执行时间。

虽然 Nifi 能够进行 ETL(不是完全成熟的),但如果您的流程只需要文件和数据库访问,我相信 Airflow 比 Nifi 更适合 ETL 过程

为 ETL 任务集成 Airflow 和 Nifi 是一个有点复杂的架构。如果可能的话,您可以考虑选择其中之一,以免遇到您在这里描述的问题。

如果您的流程包含大量不同的输入、复杂的逻辑并摄取大文件,选择 Airflow only 并不容易。

我想出了一个解决方案。

  1. 在处理器中添加一个名为 my属性name 的自定义 属性,值为 ${now()}

  2. 任何通过处理器的流文件都将具有通过处理器时的时间戳作为属性!

  3. 在步骤 1 的处理器之后有一个 UpdateAttribute 处理器,选项(在处理器属性下)Store State 设置为 Store state locally.

  4. UpdateAttribute处理器中添加自定义属性名称为readable_property并将其设置为值${'mypropertyname'}.

处理器的状态现在包含最后一个流文件的值(例如,带有从步骤 1 开始执行 now() 方法的时间戳)。

  1. 通过 REST-API 和 URI /nifi-api/processors/{id}/state 上的 GET(例如在气流中)

返回的 JSON 包含以下行:

{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}

然后你只需要解析 JSON Airflow 中的值。

注意:在前一个处理器使用 now 将属性添加到流文件和流文件实际通过 UpdateAttribute 处理器之间会有轻微的延迟,您可以从那里读取时间戳。