在 NiFi 中触发 FetchFolder?

Triggered FetchFolder in NiFi?

我正在使用 NiFi 使用专有处理工具(运行 external 到 NiFi)来编排大型二进制文件的处理。

NiFi 将源文件放在磁盘上,我调用外部工具(使用 ExecuteScript 处理器),该工具加载二进制文件并继续生成许多较小的文件。

当外部工具完全完成后,我需要"pick up"较小的(生成的)文件目录和继续通过 NiFi 处理。我需要等待,因为 [输出目录]、[文件数] 和 [处理所需时间] 是动态的。

问题:

  1. GetFile(获取目录)没有上游连接,处理完成后无法触发
  2. ListFile + FetchFile 组合不起作用b/c ListFile 没有上游连接,所以——再次——我无法在处理完成后触发它。

...那么我可以使用什么处理器,在完成二进制处理后,获取 new 文件的目录并将它们带入 NiFi 土地?

我假设您的外部工具有办法在完成时通知 NiFi,因为即使 GetFile 或 ListFile 支持传入流文件,您也需要它..

那么分两步如何...

外部工具写入目录 1,完成后调用 HandleHttpRequest 处理器提供的 REST API,然后转到调用 "mv directory-1 directory-2" 的 ExecuteScript 处理器。

ListFile 处理器一直在监视 directory-2,但在上面的移动命令执行之前看不到任何东西。

有点符合@Bryan Bende 的回答,我最终使用 ExecuteScript 处理器创建了一个提供上游连接的 "ListFile" 处理器:

import java.nio.charset.StandardCharsets
import groovy.io.FileType
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def fetchDirectory = flowFile.getAttribute('fetchDirectory')
def listOfFiles = []
def dir = new File(fetchDirectory)
if(dir.exists()) {
   dir.eachFileRecurse (FileType.FILES) { file ->
      listOfFiles << file
   }
}
listOfFiles.each { i ->
   def newFlowFile = session.create()
   session.putAttribute(newFlowFile, 'path', i.path)
   session.putAttribute(newFlowFile, 'filename', i.getName())
   flowFiles << newFlowFile
}
session.remove(flowFile)
session.transfer(flowFiles, REL_SUCCESS)

因此,当外部工具完成时,我将块的 FlowFile 路由到上述处理器,然后通过管道传输到 FetchFile 处理器。