访问 NiFi 流文件的谱系
Accessing lineage of a NiFi flow file
我正在为 NiFi 的流文件开发某种错误处理,例如数据库子系统拒绝从流文件写入数据,因为数据不符合预期,因为该数据的源系统缺少一些主数据。
因此,此错误处理将数据写入 MongoDB 并提供更多出错信息。
其中一个 'more information' 是此流文件的某种堆栈跟踪,表示数据沿袭。为此,我编写了一个带有 Groovy 脚本的 InvokeScriptedProcessor 来实现这一点。
这是脚本的重要部分:
ArrayList getStacktrace(flowfileUuid){
def lineage = this.provenanceRepository.createLineageQuery(flowfileUuid)
def lineageData = this.provenanceRepository.getLineageData(lineage.id)
if (lineageData.results == null || lineageData.results.nodes.size() == 0){
println "cannot find stacktrace for ${flowfileUuid}."
return []
}
def eventIds = lineageData.results.nodes.findAll {n -> n.type == 'EVENT'}.collect {n -> n.id }.sort()
def provenanceEvents = []
for (eventId in eventIds){
provenanceEvents << this.provenanceRepository.getProvenanceEvent(eventId).provenanceEvent.componentName
}
this.provenanceRepository.deleteLineageQuery(lineage.id)
return provenanceEvents
}
对于 createLineageQuery
我 POSTING
到 nifi-api /nifi-api/provenance/lineage
在正文中添加流文件的 uuid
。结果是查询的 ID
。我正在使用这个 ID
到 getLineageData
;还有一个 属性 finished
,我正在等待查询完成。
使用此沿袭数据,我 getProvenanceEvent
数据并将组件(处理器)的名称写入数组。
之后,我 deleteLineageQuery
如文档中所述。
所以这将是我的堆栈跟踪。
现在的问题是,当流文件第一次点击这个 InvokeScriptedProcessor
时,沿袭数据是空的。我尝试了很多事情,比如等待之类的。没用。
现在奇怪的是,当我为这个处理器重放流文件时,谱系数据不为空。
所以这种行为并不像我预期的那样是确定性的。
第一次处理流文件时,有时沿袭数据不为空
我也用 Fiddler 试过这个东西,它一直在那里工作。
我的做法有问题吗?
我目前正在使用 NiFi 1.6.0
.
编辑:
我会把布莱恩的回答作为解决方案。
我会尽快调查,但听起来是正确的。不过,我用 NiFi 1.8.0
尝试了我的解决方案,它按预期工作。所以目前我对我在第一步中实现它的方式很好,但我会根据 Bryan 的建议改进我的解决方案。
我不完全确定问题出在哪里,但一般来说,出处数据并不是真的要从处理器访问,这就是为什么没有 API 由会话或上下文提供的原因让您检索出处事件,只允许创建事件。
为了 运行 来源查询,需要对事件进行索引,并且无法保证与处理流文件的时间相关的索引何时发生。所以可能事件还不可见。
ReportingTask 是访问来源事件的预期方式,可用于将它们从 NiFi 推送到某个外部系统以进行长期存储。
我正在为 NiFi 的流文件开发某种错误处理,例如数据库子系统拒绝从流文件写入数据,因为数据不符合预期,因为该数据的源系统缺少一些主数据。
因此,此错误处理将数据写入 MongoDB 并提供更多出错信息。
其中一个 'more information' 是此流文件的某种堆栈跟踪,表示数据沿袭。为此,我编写了一个带有 Groovy 脚本的 InvokeScriptedProcessor 来实现这一点。
这是脚本的重要部分:
ArrayList getStacktrace(flowfileUuid){
def lineage = this.provenanceRepository.createLineageQuery(flowfileUuid)
def lineageData = this.provenanceRepository.getLineageData(lineage.id)
if (lineageData.results == null || lineageData.results.nodes.size() == 0){
println "cannot find stacktrace for ${flowfileUuid}."
return []
}
def eventIds = lineageData.results.nodes.findAll {n -> n.type == 'EVENT'}.collect {n -> n.id }.sort()
def provenanceEvents = []
for (eventId in eventIds){
provenanceEvents << this.provenanceRepository.getProvenanceEvent(eventId).provenanceEvent.componentName
}
this.provenanceRepository.deleteLineageQuery(lineage.id)
return provenanceEvents
}
对于 createLineageQuery
我 POSTING
到 nifi-api /nifi-api/provenance/lineage
在正文中添加流文件的 uuid
。结果是查询的 ID
。我正在使用这个 ID
到 getLineageData
;还有一个 属性 finished
,我正在等待查询完成。
使用此沿袭数据,我 getProvenanceEvent
数据并将组件(处理器)的名称写入数组。
之后,我 deleteLineageQuery
如文档中所述。
所以这将是我的堆栈跟踪。
现在的问题是,当流文件第一次点击这个 InvokeScriptedProcessor
时,沿袭数据是空的。我尝试了很多事情,比如等待之类的。没用。
现在奇怪的是,当我为这个处理器重放流文件时,谱系数据不为空。
所以这种行为并不像我预期的那样是确定性的。
第一次处理流文件时,有时沿袭数据不为空
我也用 Fiddler 试过这个东西,它一直在那里工作。
我的做法有问题吗?
我目前正在使用 NiFi 1.6.0
.
编辑:
我会把布莱恩的回答作为解决方案。
我会尽快调查,但听起来是正确的。不过,我用 NiFi 1.8.0
尝试了我的解决方案,它按预期工作。所以目前我对我在第一步中实现它的方式很好,但我会根据 Bryan 的建议改进我的解决方案。
我不完全确定问题出在哪里,但一般来说,出处数据并不是真的要从处理器访问,这就是为什么没有 API 由会话或上下文提供的原因让您检索出处事件,只允许创建事件。
为了 运行 来源查询,需要对事件进行索引,并且无法保证与处理流文件的时间相关的索引何时发生。所以可能事件还不可见。
ReportingTask 是访问来源事件的预期方式,可用于将它们从 NiFi 推送到某个外部系统以进行长期存储。