Nifi Custom Processor 如何在内容或属性中写入结果
Nifi Custom Processor how to write result in content or attribute
我编写了一个简单的自定义处理器,将两个数字相加然后显示结果。
但我不知道如何在 Flowfile 内容或属性中显示结果。
在输入值 1 和输入值 2 属性中添加值后,我 运行 处理器 Flowfile 为空。
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {
public static final PropertyDescriptor NUMBER_1 = new PropertyDescriptor
.Builder().name("Input Value 1")
.displayName("Input Value 1")
.description("Enter the input value 1 to perform addition operation")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor NUMBER_2 = new PropertyDescriptor
.Builder().name("Input Value 2")
.displayName("Input Value 2")
.description("Enter the input value 2 to perform addition operation")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("Success")
.description("All created FlowFiles are routed to this relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(NUMBER_1);
descriptors.add(NUMBER_2);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
int num1 = Integer.parseInt(context.getProperty(NUMBER_1).getValue());
int num2 = Integer.parseInt(context.getProperty(NUMBER_2).getValue());
final String output = String.valueOf(num1 + num2);
flowFile =session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
out.write(Integer.parseInt(output));
// IOUtils.write(output, out); // writes the result to the flowfile.
}
});
session.transfer(flowFile, REL_SUCCESS);
// Transfer the output flowfile to success relationship.
}
}
如何在流文件内容或流文件属性中显示结果?
要创建属性,请尝试这样的操作,
final AtomicInteger line_count_from_file = new AtomicInteger(0);
final AtomicInteger field_count_in_header = new AtomicInteger(0);
final AtomicInteger valid_line_count = new AtomicInteger(0);
final AtomicInteger lineNo = new AtomicInteger(0);
final AtomicReference<String> corrupt_line_nos = new AtomicReference<String>(initialReference);
final AtomicReference<String> flowfileContent= new AtomicReference<>();
corrupt_line_nos.set("Attribute Value");
String result = "Sample flowfile content"
flowfileContent.set(result);
Map<String, String> metricAttributes = new HashMap<>();
metricAttributes.put("line.count.from.file", String.valueOf(line_count_from_file.get()));
metricAttributes.put("field.count.in.header", String.valueOf(field_count_in_header.get()));
metricAttributes.put("valid.line.count", String.valueOf(valid_line_count.get()));
metricAttributes.put("corrupt.line.nos", String.valueOf(corrupt_line_nos.get()));
flowfile= session.putAllAttributes(flowFile, metricAttributes);
flowfile= session.write(flowfile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(flowfileContent.get().getBytes());
}
});
session.transfer(flowfile, REL_SUCCESS);
我编写了一个简单的自定义处理器,将两个数字相加然后显示结果。 但我不知道如何在 Flowfile 内容或属性中显示结果。 在输入值 1 和输入值 2 属性中添加值后,我 运行 处理器 Flowfile 为空。
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {
public static final PropertyDescriptor NUMBER_1 = new PropertyDescriptor
.Builder().name("Input Value 1")
.displayName("Input Value 1")
.description("Enter the input value 1 to perform addition operation")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor NUMBER_2 = new PropertyDescriptor
.Builder().name("Input Value 2")
.displayName("Input Value 2")
.description("Enter the input value 2 to perform addition operation")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("Success")
.description("All created FlowFiles are routed to this relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(NUMBER_1);
descriptors.add(NUMBER_2);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
int num1 = Integer.parseInt(context.getProperty(NUMBER_1).getValue());
int num2 = Integer.parseInt(context.getProperty(NUMBER_2).getValue());
final String output = String.valueOf(num1 + num2);
flowFile =session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
out.write(Integer.parseInt(output));
// IOUtils.write(output, out); // writes the result to the flowfile.
}
});
session.transfer(flowFile, REL_SUCCESS);
// Transfer the output flowfile to success relationship.
}
}
如何在流文件内容或流文件属性中显示结果?
要创建属性,请尝试这样的操作,
final AtomicInteger line_count_from_file = new AtomicInteger(0);
final AtomicInteger field_count_in_header = new AtomicInteger(0);
final AtomicInteger valid_line_count = new AtomicInteger(0);
final AtomicInteger lineNo = new AtomicInteger(0);
final AtomicReference<String> corrupt_line_nos = new AtomicReference<String>(initialReference);
final AtomicReference<String> flowfileContent= new AtomicReference<>();
corrupt_line_nos.set("Attribute Value");
String result = "Sample flowfile content"
flowfileContent.set(result);
Map<String, String> metricAttributes = new HashMap<>();
metricAttributes.put("line.count.from.file", String.valueOf(line_count_from_file.get()));
metricAttributes.put("field.count.in.header", String.valueOf(field_count_in_header.get()));
metricAttributes.put("valid.line.count", String.valueOf(valid_line_count.get()));
metricAttributes.put("corrupt.line.nos", String.valueOf(corrupt_line_nos.get()));
flowfile= session.putAllAttributes(flowFile, metricAttributes);
flowfile= session.write(flowfile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(flowfileContent.get().getBytes());
}
});
session.transfer(flowfile, REL_SUCCESS);