获取 Nifi PutSQL 处理器的最后执行时间戳
Get last execution timestamp of a Nifi PutSQL processor
有没有办法通过 REST-API 获取 PutSQL
处理器最后一次执行的时间戳?这样的时间戳是否存在,或者我可以自己建立一个吗?
设置: 我有 Airflow 来触发我的 Nifi-ETL,它以几个 PutSQL 处理器结束 - 完成这些之后我需要在 Airflow 中执行其他操作。
思路:我想触发第一个Nifi处理器,然后在Airflow中等待,直到更新最后一个PutSQL处理器的last_execution_timestamp
。
问题:
我尝试访问属性 statsLastRefreshed
,但这不是最后一次执行时间,而是最后一次(用户/api-请求)访问导致 Nifi 刷新处理器的处理器。
s = processor["status"]["statsLastRefreshed"] # '13:13:26 CEST'
我在 Airflow 的 REST API 文档中找不到任何内容。
我看到的唯一其他选择是从 Airflow 向最后一个 PutSQL 处理器的数据库 table 发出请求,以查看那里是否发生了任何新情况。
Nifi 是为连续数据流而生的。所以 Processor
统计中没有最后执行时间。
虽然 Nifi 能够进行 ETL(不是完全成熟的),但如果您的流程只需要文件和数据库访问,我相信 Airflow 比 Nifi 更适合 ETL 过程
为 ETL 任务集成 Airflow 和 Nifi 是一个有点复杂的架构。如果可能的话,您可以考虑选择其中之一,以免遇到您在这里描述的问题。
如果您的流程包含大量不同的输入、复杂的逻辑并摄取大文件,选择 Airflow only 并不容易。
我想出了一个解决方案。
在处理器中添加一个名为 my属性name 的自定义 属性,值为 ${now()}
任何通过处理器的流文件都将具有通过处理器时的时间戳作为属性!
在步骤 1 的处理器之后有一个 UpdateAttribute
处理器,选项(在处理器属性下)Store State 设置为 Store state locally
.
在UpdateAttribute
处理器中添加自定义属性名称为readable_property并将其设置为值${'mypropertyname'}
.
处理器的状态现在包含最后一个流文件的值(例如,带有从步骤 1 开始执行 now()
方法的时间戳)。
- 通过 REST-API 和 URI
/nifi-api/processors/{id}/state
上的 GET(例如在气流中)
返回的 JSON 包含以下行:
{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}
然后你只需要解析 JSON Airflow 中的值。
注意:在前一个处理器使用 now
将属性添加到流文件和流文件实际通过 UpdateAttribute
处理器之间会有轻微的延迟,您可以从那里读取时间戳。
有没有办法通过 REST-API 获取 PutSQL
处理器最后一次执行的时间戳?这样的时间戳是否存在,或者我可以自己建立一个吗?
设置: 我有 Airflow 来触发我的 Nifi-ETL,它以几个 PutSQL 处理器结束 - 完成这些之后我需要在 Airflow 中执行其他操作。
思路:我想触发第一个Nifi处理器,然后在Airflow中等待,直到更新最后一个PutSQL处理器的last_execution_timestamp
。
问题:
我尝试访问属性 statsLastRefreshed
,但这不是最后一次执行时间,而是最后一次(用户/api-请求)访问导致 Nifi 刷新处理器的处理器。
s = processor["status"]["statsLastRefreshed"] # '13:13:26 CEST'
我在 Airflow 的 REST API 文档中找不到任何内容。
我看到的唯一其他选择是从 Airflow 向最后一个 PutSQL 处理器的数据库 table 发出请求,以查看那里是否发生了任何新情况。
Nifi 是为连续数据流而生的。所以 Processor
统计中没有最后执行时间。
虽然 Nifi 能够进行 ETL(不是完全成熟的),但如果您的流程只需要文件和数据库访问,我相信 Airflow 比 Nifi 更适合 ETL 过程
为 ETL 任务集成 Airflow 和 Nifi 是一个有点复杂的架构。如果可能的话,您可以考虑选择其中之一,以免遇到您在这里描述的问题。
如果您的流程包含大量不同的输入、复杂的逻辑并摄取大文件,选择 Airflow only 并不容易。
我想出了一个解决方案。
在处理器中添加一个名为 my属性name 的自定义 属性,值为
${now()}
任何通过处理器的流文件都将具有通过处理器时的时间戳作为属性!
在步骤 1 的处理器之后有一个
UpdateAttribute
处理器,选项(在处理器属性下)Store State 设置为Store state locally
.在
UpdateAttribute
处理器中添加自定义属性名称为readable_property并将其设置为值${'mypropertyname'}
.
处理器的状态现在包含最后一个流文件的值(例如,带有从步骤 1 开始执行 now()
方法的时间戳)。
- 通过 REST-API 和 URI
/nifi-api/processors/{id}/state
上的 GET(例如在气流中)
返回的 JSON 包含以下行:
{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}
然后你只需要解析 JSON Airflow 中的值。
注意:在前一个处理器使用 now
将属性添加到流文件和流文件实际通过 UpdateAttribute
处理器之间会有轻微的延迟,您可以从那里读取时间戳。