如何将 FlowFile 属性写入 FlowFile 内容中的 Avro 元数据?
How can I write FlowFile attributes to Avro metadata inside the FlowFile's content?
我正在创建 FlowFiles,这些文件在被 ExecuteSql
处理器发出后在下游进行操作和拆分。我用我想放入每个 FlowFile 内容中包含的 Avro 元数据的数据填充了 FlowFiles 的属性。
我该怎么做?
我试过使用配置有 AvroReader
和 AvroRecordSetWriter
的 UpdateRecord
处理器以及带有 /canary
密钥的 属性将 FlowFile 属性写入 Avro 文档中的那个键某处。不过,它不会出现在输出中的任何地方。
将 Avro 数据中的记录移动到一个子项并让元数据部分成为记录数据的一部分是可以接受的。不过,我不想这样做,因为它似乎不是正确的解决方案,而且它听起来比简单地修改 Avro 元数据要复杂得多。
记录感知处理器(和 Readers/Writers)不感知元数据,这意味着它们目前(从 NiFi 1.5.0 开始)不能以任何方式对元数据进行操作(检查、创建、删除、等),因此 UpdateRecord 本身不适用于元数据。使用您的 /canary 属性 键,它将尝试在您的 Avro 记录中插入一个名为 canary 的顶级字段,并且应该具有您指定的值。但是我相信您的输出模式需要在顶层添加 canary 字段,否则它可能会被忽略(我对此并不肯定,您可以检查输出模式以查看它是否自动添加)。
目前没有可以显式更新 Avro 元数据的 NiFi 处理器(MergeContent 做了一些关于将各种 Avro 文件合并在一起的事情,但是你不能选择设置一个值,例如)。但是,我有一个未完善的 Groovy 脚本,您可以在 ExecuteScript 中使用它来将元数据添加到 NiFi 1.5.0+ 中的 Avro 文件。在 ExecuteScript 中,您可以将语言设置为 Groovy 并将以下内容设置为脚本主体,然后将用户定义的(又名 "dynamic" 属性)添加到 ExecuteScript,其中键将是元数据键,并且评估值(属性支持表达式语言)将是值:
@Grab('org.apache.avro:avro:1.8.1')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*
def flowFile = session.get()
if(!flowFile) return
try {
// Save off dynamic property values for metadata key/values later
def metadata = [:]
context.properties.findAll {e -> e.key.dynamic}.each {k,v -> metadata.put(k.name, context.getProperty(k).evaluateAttributeExpressions(flowFile).value.bytes)}
flowFile = session.write(flowFile, {inStream, outStream ->
DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())
def schema = reader.schema
def inputCodec = reader.getMetaString(DataFileConstants.CODEC) ?: DataFileConstants.NULL_CODEC
// Forward the existing metadata to the output
reader.metaKeys.each { key ->
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key)
writer.setMeta(key, metadatum)
}
}
// For each dynamic property, set the key/value pair as Avro metadata
metadata.each {k,v -> writer.setMeta(k,v)}
writer.setCodec(CodecFactory.fromString(inputCodec))
writer.create(schema, outStream)
writer.appendAllFrom(reader, false)
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Error adding Avro metadata, penalizing flow file and routing to failure', e)
flowFile = session.penalize(flowFile)
session.transfer(flowFile, REL_FAILURE)
}
请注意,此脚本可以与 1.5.0 之前的 NiFi 版本一起使用,但直到 1.5.0 才支持顶部的@Grab,因此您必须将 Avro 及其依赖项下载到一个平面中文件夹,并指向 ExecuteScript 的模块目录 属性 中的那个。
我正在创建 FlowFiles,这些文件在被 ExecuteSql
处理器发出后在下游进行操作和拆分。我用我想放入每个 FlowFile 内容中包含的 Avro 元数据的数据填充了 FlowFiles 的属性。
我该怎么做?
我试过使用配置有 AvroReader
和 AvroRecordSetWriter
的 UpdateRecord
处理器以及带有 /canary
密钥的 属性将 FlowFile 属性写入 Avro 文档中的那个键某处。不过,它不会出现在输出中的任何地方。
将 Avro 数据中的记录移动到一个子项并让元数据部分成为记录数据的一部分是可以接受的。不过,我不想这样做,因为它似乎不是正确的解决方案,而且它听起来比简单地修改 Avro 元数据要复杂得多。
记录感知处理器(和 Readers/Writers)不感知元数据,这意味着它们目前(从 NiFi 1.5.0 开始)不能以任何方式对元数据进行操作(检查、创建、删除、等),因此 UpdateRecord 本身不适用于元数据。使用您的 /canary 属性 键,它将尝试在您的 Avro 记录中插入一个名为 canary 的顶级字段,并且应该具有您指定的值。但是我相信您的输出模式需要在顶层添加 canary 字段,否则它可能会被忽略(我对此并不肯定,您可以检查输出模式以查看它是否自动添加)。
目前没有可以显式更新 Avro 元数据的 NiFi 处理器(MergeContent 做了一些关于将各种 Avro 文件合并在一起的事情,但是你不能选择设置一个值,例如)。但是,我有一个未完善的 Groovy 脚本,您可以在 ExecuteScript 中使用它来将元数据添加到 NiFi 1.5.0+ 中的 Avro 文件。在 ExecuteScript 中,您可以将语言设置为 Groovy 并将以下内容设置为脚本主体,然后将用户定义的(又名 "dynamic" 属性)添加到 ExecuteScript,其中键将是元数据键,并且评估值(属性支持表达式语言)将是值:
@Grab('org.apache.avro:avro:1.8.1')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*
def flowFile = session.get()
if(!flowFile) return
try {
// Save off dynamic property values for metadata key/values later
def metadata = [:]
context.properties.findAll {e -> e.key.dynamic}.each {k,v -> metadata.put(k.name, context.getProperty(k).evaluateAttributeExpressions(flowFile).value.bytes)}
flowFile = session.write(flowFile, {inStream, outStream ->
DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())
def schema = reader.schema
def inputCodec = reader.getMetaString(DataFileConstants.CODEC) ?: DataFileConstants.NULL_CODEC
// Forward the existing metadata to the output
reader.metaKeys.each { key ->
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key)
writer.setMeta(key, metadatum)
}
}
// For each dynamic property, set the key/value pair as Avro metadata
metadata.each {k,v -> writer.setMeta(k,v)}
writer.setCodec(CodecFactory.fromString(inputCodec))
writer.create(schema, outStream)
writer.appendAllFrom(reader, false)
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Error adding Avro metadata, penalizing flow file and routing to failure', e)
flowFile = session.penalize(flowFile)
session.transfer(flowFile, REL_FAILURE)
}
请注意,此脚本可以与 1.5.0 之前的 NiFi 版本一起使用,但直到 1.5.0 才支持顶部的@Grab,因此您必须将 Avro 及其依赖项下载到一个平面中文件夹,并指向 ExecuteScript 的模块目录 属性 中的那个。