NiFi 处理器任务计数非常高,一定是什么原因?
NiFi processor task count is very high, what must be the reason?
我写了一个基本的自定义处理器,它将流发送到 "Retry" 关系并调用 penalize。
package nlsn.processors.core.main;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
@Tags({ "wait", "wait on time"})
@CapabilityDescription("Wait on time")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class CustomWait extends AbstractProcessor {
public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
.name("SUCCESS").description("well done, carry on").build();
public static final Relationship FAILURE_RELATIONSHIP = new Relationship.Builder()
.name("FAILURE.").description("fail").build();
public static final Relationship POINT_TO_SELF_RELATIONSHIP = new Relationship.Builder()
.name("RETRY").description("point it back to processor").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS_RELATIONSHIP);
relationships.add(FAILURE_RELATIONSHIP);
relationships.add(POINT_TO_SELF_RELATIONSHIP);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFile = session.get();
if (flowFile != null) {
logger.info("flow file is not null.");
String state = flowFile.getAttribute("_wait_state");
if (state == null || state.isEmpty()) {
logger.info("\"_wait_state\" attribute is missing, going into WAIT.");
flowFile = session.putAttribute( flowFile, "_wait_state", "1");
flowFile = session.penalize(flowFile);
session.transfer( flowFile, POINT_TO_SELF_RELATIONSHIP );
} else {
logger.info("\"_wait_state\" attribute is available, breaking WAIT.");
flowFile = session.removeAttribute( flowFile, "_wait_state" );
session.transfer( flowFile, SUCCESS_RELATIONSHIP);
}
} else {
//logger.info("flow file is null (bad)!!!.");
}
}
}
代码按预期工作。但我想知道为什么任务计数 (192,569) 如此之高。正如预期的那样,流程在 30 秒内完成?
(参见 CustomWait 处理器任务计数)
- 什么是 nifi 运行 在后台?
- 这么大的数字真的占用了 CPU 吗?
- 如果这不好,如何解决?
谢谢
- 当队列中有一个流文件(FF)在不检查 FF 的惩罚状态的情况下提供进程时,NiFi 控制器将处理器调度到 运行。在处理器的 onTrigger 中,它将尝试从输入队列 (
session.get()
) 中获取 FF。这个 session.get()
不会得到任何惩罚的 FF,所以它最终会返回 null。这就是为什么需要检查 null FF 的原因,而且还不错。我假设您没有更改 运行 计划,这意味着控制器将尝试尽快 运行 该处理器。这会导致任务计数膨胀。
- 它正在尝试检查要处理的输入,因此它正在使用 CPU。是否占用大量资源取决于系统上可用任务和处理器的数量 运行。
- 本身并不坏,但可以通过设置 运行 时间表来减少!= 0。
我写了一个基本的自定义处理器,它将流发送到 "Retry" 关系并调用 penalize。
package nlsn.processors.core.main;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
@Tags({ "wait", "wait on time"})
@CapabilityDescription("Wait on time")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class CustomWait extends AbstractProcessor {
public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
.name("SUCCESS").description("well done, carry on").build();
public static final Relationship FAILURE_RELATIONSHIP = new Relationship.Builder()
.name("FAILURE.").description("fail").build();
public static final Relationship POINT_TO_SELF_RELATIONSHIP = new Relationship.Builder()
.name("RETRY").description("point it back to processor").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS_RELATIONSHIP);
relationships.add(FAILURE_RELATIONSHIP);
relationships.add(POINT_TO_SELF_RELATIONSHIP);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFile = session.get();
if (flowFile != null) {
logger.info("flow file is not null.");
String state = flowFile.getAttribute("_wait_state");
if (state == null || state.isEmpty()) {
logger.info("\"_wait_state\" attribute is missing, going into WAIT.");
flowFile = session.putAttribute( flowFile, "_wait_state", "1");
flowFile = session.penalize(flowFile);
session.transfer( flowFile, POINT_TO_SELF_RELATIONSHIP );
} else {
logger.info("\"_wait_state\" attribute is available, breaking WAIT.");
flowFile = session.removeAttribute( flowFile, "_wait_state" );
session.transfer( flowFile, SUCCESS_RELATIONSHIP);
}
} else {
//logger.info("flow file is null (bad)!!!.");
}
}
}
代码按预期工作。但我想知道为什么任务计数 (192,569) 如此之高。正如预期的那样,流程在 30 秒内完成?
(参见 CustomWait 处理器任务计数)
- 什么是 nifi 运行 在后台?
- 这么大的数字真的占用了 CPU 吗?
- 如果这不好,如何解决?
谢谢
- 当队列中有一个流文件(FF)在不检查 FF 的惩罚状态的情况下提供进程时,NiFi 控制器将处理器调度到 运行。在处理器的 onTrigger 中,它将尝试从输入队列 (
session.get()
) 中获取 FF。这个session.get()
不会得到任何惩罚的 FF,所以它最终会返回 null。这就是为什么需要检查 null FF 的原因,而且还不错。我假设您没有更改 运行 计划,这意味着控制器将尝试尽快 运行 该处理器。这会导致任务计数膨胀。 - 它正在尝试检查要处理的输入,因此它正在使用 CPU。是否占用大量资源取决于系统上可用任务和处理器的数量 运行。
- 本身并不坏,但可以通过设置 运行 时间表来减少!= 0。