NiFi 处理器任务计数非常高,一定是什么原因?

NiFi processor task count is very high, what must be the reason?

我写了一个基本的自定义处理器,它将流发送到 "Retry" 关系并调用 penalize。

package nlsn.processors.core.main;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;

@Tags({ "wait", "wait on time"})
@CapabilityDescription("Wait on time")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class CustomWait extends AbstractProcessor {

    public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
            .name("SUCCESS").description("well done, carry on").build();

    public static final Relationship FAILURE_RELATIONSHIP = new Relationship.Builder()
            .name("FAILURE.").description("fail").build();

    public static final Relationship POINT_TO_SELF_RELATIONSHIP = new Relationship.Builder()
            .name("RETRY").description("point it back to processor").build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;


    @Override
    protected void init(final ProcessorInitializationContext context) {

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(SUCCESS_RELATIONSHIP);
        relationships.add(FAILURE_RELATIONSHIP);
        relationships.add(POINT_TO_SELF_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        final ComponentLog logger = getLogger();
        FlowFile flowFile = session.get();
        if (flowFile != null) {
            logger.info("flow file is not null.");
            String state = flowFile.getAttribute("_wait_state");
            if (state == null || state.isEmpty()) {
                logger.info("\"_wait_state\" attribute is missing, going into WAIT.");
                flowFile = session.putAttribute( flowFile, "_wait_state", "1");
                flowFile = session.penalize(flowFile);
                session.transfer( flowFile, POINT_TO_SELF_RELATIONSHIP );
            } else {
                logger.info("\"_wait_state\" attribute is available, breaking WAIT.");
                flowFile = session.removeAttribute( flowFile, "_wait_state" );
                session.transfer( flowFile, SUCCESS_RELATIONSHIP); 
            }
        } else {
            //logger.info("flow file is null (bad)!!!.");
        }
    }
}

代码按预期工作。但我想知道为什么任务计数 (192,569) 如此之高。正如预期的那样,流程在 30 秒内完成?

(参见 CustomWait 处理器任务计数)

  1. 什么是 nifi 运行 在后台?
  2. 这么大的数字真的占用了 CPU 吗?
  3. 如果这不好,如何解决?

谢谢

  1. 当队列中有一个流文件(FF)在不检查 FF 的惩罚状态的情况下提供进程时,NiFi 控制器将处理器调度到 运行。在处理器的 onTrigger 中,它将尝试从输入队列 (session.get()) 中获取 FF。这个 session.get() 不会得到任何惩罚的 FF,所以它最终会返回 null。这就是为什么需要检查 null FF 的原因,而且还不错。我假设您没有更改 运行 计划,这意味着控制器将尝试尽快 运行 该处理器。这会导致任务计数膨胀。
  2. 它正在尝试检查要处理的输入,因此它正在使用 CPU。是否占用大量资源取决于系统上可用任务和处理器的数量 运行。
  3. 本身并不坏,但可以通过设置 运行 时间表来减少!= 0。