Apache NiFi - NullPointerException 在自定义处理器上设置多个线程
Apache NiFi - NullPointerException setting more than one thread on custom processors
我目前正在研究 NiFi 流程,该流程需要实施自定义处理器以对 csv 记录应用转换。
我在我执行的一些基准测试中注意到了这种行为:如果只为每个自定义处理器分配一个线程,则一切正常。由于 java.lang.NullPointerException.
,将更多线程分配给自定义处理器会导致 无法处理会话
由于无法用单线程重现错误,我更多地考虑处理流文件的一些问题,或者我不知道的 NiFi 的某些方面。
访问流文件属性执行处理。永远不会读取流文件内容,并在添加一些属性后返回输出流文件。以下是处理器代码相关部分的片段:
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// Will hold all the processed attributes
Map<String, String> processedAttributes = new HashMap<>();
FlowFile flowfile = session.get();
...
// Adds the attributes to the flowfile
flowfile = session.putAllAttributes(flowfile, processedAttributes);
session.transfer(flowfile, PROCESSED);
}
我在 m4.4xlarge Amazon ec2 实例上 运行 NiFi 0.7。由于我正在寻求高性能(谁不追求),我正在寻找一种增加线程数量的安全方法。非常感谢任何建议。
提前致谢。
对于某些线程,session.get() 方法可能 return 为 null(尤其是对于多线程/并发任务)。如果因为有可用的流文件而安排了两个或更多线程,则会发生这种情况,然后一个线程获取流文件(通过 session.get()),而下一个线程将变为空。
在您的情况下,您可能不是在读取或写入流文件,而是其他方法(如 session.putAllAttributes())调用流文件上的方法。许多处理器添加一个检查以查看 flowfile == null 并将 return(因为它需要一个流文件来操作),也许这也可以解决您的问题。
我目前正在研究 NiFi 流程,该流程需要实施自定义处理器以对 csv 记录应用转换。
我在我执行的一些基准测试中注意到了这种行为:如果只为每个自定义处理器分配一个线程,则一切正常。由于 java.lang.NullPointerException.
,将更多线程分配给自定义处理器会导致 无法处理会话由于无法用单线程重现错误,我更多地考虑处理流文件的一些问题,或者我不知道的 NiFi 的某些方面。
访问流文件属性执行处理。永远不会读取流文件内容,并在添加一些属性后返回输出流文件。以下是处理器代码相关部分的片段:
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// Will hold all the processed attributes
Map<String, String> processedAttributes = new HashMap<>();
FlowFile flowfile = session.get();
...
// Adds the attributes to the flowfile
flowfile = session.putAllAttributes(flowfile, processedAttributes);
session.transfer(flowfile, PROCESSED);
}
我在 m4.4xlarge Amazon ec2 实例上 运行 NiFi 0.7。由于我正在寻求高性能(谁不追求),我正在寻找一种增加线程数量的安全方法。非常感谢任何建议。
提前致谢。
对于某些线程,session.get() 方法可能 return 为 null(尤其是对于多线程/并发任务)。如果因为有可用的流文件而安排了两个或更多线程,则会发生这种情况,然后一个线程获取流文件(通过 session.get()),而下一个线程将变为空。
在您的情况下,您可能不是在读取或写入流文件,而是其他方法(如 session.putAllAttributes())调用流文件上的方法。许多处理器添加一个检查以查看 flowfile == null 并将 return(因为它需要一个流文件来操作),也许这也可以解决您的问题。