Apache NiFi - NullPointerException 在自定义处理器上设置多个线程

Apache NiFi - NullPointerException setting more than one thread on custom processors

我目前正在研究 NiFi 流程,该流程需要实施自定义处理器以对 csv 记录应用转换。

我在我执行的一些基准测试中注意到了这种行为:如果只为每个自定义处理器分配一个线程,则一切正常。由于 java.lang.NullPointerException.

,将更多线程分配给自定义处理器会导致 无法处理会话

由于无法用单线程重现错误,我更多地考虑处理流文件的一些问题,或者我不知道的 NiFi 的某些方面。

访问流文件属性执行处理。永远不会读取流文件内容,并在添加一些属性后返回输出流文件。以下是处理器代码相关部分的片段:

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    //  Will hold all the processed attributes
    Map<String, String> processedAttributes = new HashMap<>();
    FlowFile flowfile = session.get();
    ...
    //  Adds the attributes to the flowfile
    flowfile = session.putAllAttributes(flowfile, processedAttributes);
    session.transfer(flowfile, PROCESSED);
    }

我在 m4.4xlarge Amazon ec2 实例上 运行 NiFi 0.7。由于我正在寻求高性能(谁不追求),我正在寻找一种增加线程数量的安全方法。非常感谢任何建议。

提前致谢。

对于某些线程,session.get() 方法可能 return 为 null(尤其是对于多线程/并发任务)。如果因为有可用的流文件而安排了两个或更多线程,则会发生这种情况,然后一个线程获取流文件(通过 session.get()),而下一个线程将变为空。

在您的情况下,您可能不是在读取或写入流文件,而是其他方法(如 session.putAllAttributes())调用流文件上的方法。许多处理器添加一个检查以查看 flowfile == null 并将 return(因为它需要一个流文件来操作),也许这也可以解决您的问题。