Apache NiFi:同时输出到多个 FlowFiles?
Apache NiFi: Output to multiple FlowFiles simultaneously?
有没有办法在 NiFi 的自定义处理器中同时写入不同的流?例如,我有第三方库使用像这样工作的 API 进行重要处理:
public void process(InputStream in, OutputStream foo, OutputStream baa, List<String> args)
{
...
foo.write(things);
baa.write(stuff);
...
}
但我能找到的唯一示例都只使用一个输出流:
FlowFile transform = session.write(original, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write("stuff");
}
});
处理是分批进行的(由于规模大),所以执行所有处理然后写出单独的流程是不切实际的。
我能想到的唯一方法是多次处理输入:(
澄清一下,我想写入多个 FlowFiles,使用 session.write(flowfile, callback)
方法,因此不同的流可以 sent/managed 分开
NiFi API 基于一次对一个流文件进行操作,但您应该可以这样做:
FlowFile flowFile1 = session.create();
final AtomicReference<FlowFile> holder = new AtomicReference<>(session.create());
flowFile1 = session.write(flowFile1, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
FlowFile flowFile2 = session.write(holder.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
}
});
holder.set(flowFile2);
}
});
由于您是从同一输入生成不同的输出,因此您还可以考虑将这些步骤分解为专注于执行其特定功能的离散处理器。上面你显示 "things" 和 "stuff" 所以例如我建议你有一个 'DoThings' 和 'DoStuff' 处理器。在您的流程中,您可以通过简单地使用源连接两次来向两者发送相同的流程文件。这样就可以实现很好的并行操作,并允许它们具有不同的 runtimes/etc。 NiFi 仍会为您保留出处踪迹,它实际上根本不会复制字节,而是传递指向原始内容的指针。
有没有办法在 NiFi 的自定义处理器中同时写入不同的流?例如,我有第三方库使用像这样工作的 API 进行重要处理:
public void process(InputStream in, OutputStream foo, OutputStream baa, List<String> args)
{
...
foo.write(things);
baa.write(stuff);
...
}
但我能找到的唯一示例都只使用一个输出流:
FlowFile transform = session.write(original, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write("stuff");
}
});
处理是分批进行的(由于规模大),所以执行所有处理然后写出单独的流程是不切实际的。
我能想到的唯一方法是多次处理输入:(
澄清一下,我想写入多个 FlowFiles,使用 session.write(flowfile, callback)
方法,因此不同的流可以 sent/managed 分开
NiFi API 基于一次对一个流文件进行操作,但您应该可以这样做:
FlowFile flowFile1 = session.create();
final AtomicReference<FlowFile> holder = new AtomicReference<>(session.create());
flowFile1 = session.write(flowFile1, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
FlowFile flowFile2 = session.write(holder.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
}
});
holder.set(flowFile2);
}
});
由于您是从同一输入生成不同的输出,因此您还可以考虑将这些步骤分解为专注于执行其特定功能的离散处理器。上面你显示 "things" 和 "stuff" 所以例如我建议你有一个 'DoThings' 和 'DoStuff' 处理器。在您的流程中,您可以通过简单地使用源连接两次来向两者发送相同的流程文件。这样就可以实现很好的并行操作,并允许它们具有不同的 runtimes/etc。 NiFi 仍会为您保留出处踪迹,它实际上根本不会复制字节,而是传递指向原始内容的指针。