NiFi 自定义处理器表达式语言

NiFi custom processor expression language

我正在尝试在 Apache NiFi 中创建一个自定义处理器,它可以将 attribute/string 添加到流文件内容中的 JSON 对象。目前,当我只使用字符串时它可以工作,但是当我使用 NiFi 的表达式语言时它不起作用,尽管我的代码支持它。

表达语言是 100% 正确的,因为它在另一个处理器中工作,我也尝试了不同的属性以确保它不是该属性。

属性:

public static final PropertyDescriptor ADD_ATTRIBUTE = new PropertyDescriptor
        .Builder().name("Add Attribute")
        .description("Example Property")
        .required(true)
        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        .expressionLanguageSupported(true)
        .build();

稍后在我的代码中,当我想要获取值并放入我使用的 JSON 对象时:

jsonObject.put("hostname", context.getProperty(ADD_ATTRIBUTE).evaluateAttributeExpressions().getValue());

我还进行了单元测试,当我将文本值分配给 testrunner.setProperty 时它起作用了。但是我不知道如何将属性分配给测试运行器或如何在测试中使用表达式语言。

提前感谢您提出任何建议或解决方案!

我也会把 Hortonworks Community Connection 的回答放在这里 FWIW:

如果表达式引用流文件上的属性,您需要将流文件的引用传递到 evaluateAttributeExpressions 中:

FlowFile flowFile = session.get();
jsonObject.put("hostname", context.getProperty(ADD_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue());

如果 属性 包含属性名称(而不是包含属性名称的表达式)并且您需要流文件中的值:

jsonObject.put("hostname", flowFile.getAttribute(context.getProperty(ADD_ATTRIBUTE).getValue()));

如果属性值本身包含Expression Language,而您想对其求值,请看下面class:

org.apache.nifi.attribute.expression.language.Query

关于测试...

假设您正在根据传入的 FlowFile (evaluateAttributeExpressions(flowFile)) 评估表达式语言,那么您可以执行以下操作:

runner.setProperty(ADD_ATTRIBUTE, "${my.attribute}");

然后创建一个属性映射,里面有my.attribute:

final Map<String,String> attributes = new HashMap<>(); attributes.put("my.attribute", myAttribute);

然后将一些具有以下属性的内容加入队列:

runner.enqueue(fileIn, attributes); runner.run();

来自代码库的示例:

https://github.com/apache/nifi/blob/1e56de9521e4bc0752b419ffc7d62e096db1c389/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java#L243