Kafka Processor 不保留流文件的属性状态
Kafka Processor does not keep the state of attributes of flowfile
我更新了 flowfile 的一些属性并将其放入 kafka 中,但是当我从 consumekafka_2.0 处理器中使用相同的属性时,属性丢失了。
这不被支持吗?我需要定制这个处理器吗?
当我看到下面的处理器源代码时,我发现它已经在从记录中读取属性并将其写入流文件中,那么为什么这些在流文件中不可用?
private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
tracker.incrementRecordCount(1);
final byte[] value = record.value();
if (value != null) {
flowFile = session.write(flowFile, out -> {
out.write(value);
});
}
flowFile = session.putAllAttributes(flowFile, getAttributes(record));
tracker.updateFlowFile(flowFile);
populateAttributes(tracker);
session.transfer(tracker.flowFile, REL_SUCCESS);
}
为了传递属性,您必须使用 Kafka headers,否则无法传递属性,因为它们不是流文件 body 的一部分是什么将成为 Kafka 中消息的 body。
在发布端,PublishKafka_2_0 具有以下 属性 来指定将哪些属性作为 headers 发送:
static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
.name("attribute-name-regex")
.displayName("Attributes to Send as Headers (Regex)")
.description("A Regular Expression that is matched against all FlowFile attribute names. "
+ "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
在消费者方面,ConsumeKafka_2_0 具有以下 属性 来指定将哪些 header 字段添加为属性:
static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
.name("header-name-regex")
.displayName("Headers to Add as Attributes (Regex)")
.description("A Regular Expression that is matched against all message headers. "
+ "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. "
+ "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by "
+ "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like "
+ "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
+ "the messages together efficiently.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
我更新了 flowfile 的一些属性并将其放入 kafka 中,但是当我从 consumekafka_2.0 处理器中使用相同的属性时,属性丢失了。 这不被支持吗?我需要定制这个处理器吗?
当我看到下面的处理器源代码时,我发现它已经在从记录中读取属性并将其写入流文件中,那么为什么这些在流文件中不可用?
private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
tracker.incrementRecordCount(1);
final byte[] value = record.value();
if (value != null) {
flowFile = session.write(flowFile, out -> {
out.write(value);
});
}
flowFile = session.putAllAttributes(flowFile, getAttributes(record));
tracker.updateFlowFile(flowFile);
populateAttributes(tracker);
session.transfer(tracker.flowFile, REL_SUCCESS);
}
为了传递属性,您必须使用 Kafka headers,否则无法传递属性,因为它们不是流文件 body 的一部分是什么将成为 Kafka 中消息的 body。
在发布端,PublishKafka_2_0 具有以下 属性 来指定将哪些属性作为 headers 发送:
static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
.name("attribute-name-regex")
.displayName("Attributes to Send as Headers (Regex)")
.description("A Regular Expression that is matched against all FlowFile attribute names. "
+ "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
在消费者方面,ConsumeKafka_2_0 具有以下 属性 来指定将哪些 header 字段添加为属性:
static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
.name("header-name-regex")
.displayName("Headers to Add as Attributes (Regex)")
.description("A Regular Expression that is matched against all message headers. "
+ "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. "
+ "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by "
+ "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like "
+ "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
+ "the messages together efficiently.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();