生成单流文件以将其加载到 S3

Generating Single Flow file for loading it into S3

我有一个 Nifi Flow,它从 RDS 表中获取数据并作为平面文件加载到 S3 中,现在我需要生成另一个文件,该文件将具有我加载到 S3 存储桶中的文件的名称,这需要是一个单独的流程;

示例:如果 RDS 提取的平面文件名为 RDS.txt,则新生成的文件应包含 rds.txt 作为内容,我需要将此文件加载到同一个 S3 存储桶。

我面临的问题是我正在使用生成流文件处理器并将平面文件名添加为流文件中的自定义文本,但我无法为生成流文件处理器设置任何上游,所以这会生成更多文件,如果我在生成流文件处理器之后使用合并内容处理器,我可以在流文件中看到重复值。

谁能帮我解决这个问题

I have a Nifi Flow, which fetches a data from RDS tables and load into S3 as flat files, now i need to generate another file which will be having the name of the file that I am loading into S3 bucket, this needs to be a separate flow;

最简单的方法是在 PutS3Object 之后链接一些东西,这将用你想要的更新流文件内容。用 ExecuteScript 编写会非常简单。像这样:

def ff = session.get()
if (ff) {
  def updated = session.write(ff, {
    it.write(ff.getAttribute("filename").bytes)
  } as OutputStreamCallback)
  updated = session.putAttribute(updated, "is_updated", "true")
  session.transfer(updated, REL_SUCCESS)
}

然后你可以在 PutS3Object 之后放置一个 RouteOnAttribute 并且如果它检测到属性 is_updated 则让它路由到空路由,或者如果它检测到属性 PutS3Object 则路由回 PutS3Object还没更新。

我有一个简单的解决方案我在 put s3 对象之前添加了一个漏斗,漏斗的上游将接收两个文件,一个是提取物,另一个是文件名,漏斗的下游连接到 puts3 对象,因此这将同时加载两个文件