如何在无界管道中访问 DoFn 中的文件名
How to access file name within a DoFn in an unbounded pipeline
我正在寻找一种方法来访问在 DoFn 中进行数据转换期间正在处理的文件的名称。
我的流水线如下图:
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match()
.filepattern(options.getInput())
.continuously(Duration.standardSeconds(5),
Watch.Growth.<String>never()))
.apply(FileIO.readMatches()
.withCompression(Compression.GZIP))
.apply(XmlIO.<MyString>readFiles()
.withRootElement("root")
.withRecordElement("record")
.withRecordClass(MyString.class))//<-- This only returns the contents of the file
.apply(ParDo.of(new ProcessRecord()))//<-- I need to access file name here
.apply(ParDo.of(new FormatRecord()))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
.apply(new CustomWrite(options));
处理的每个文件都是一个 XML 文档。在处理内容时,我也需要访问正在处理的文件的名称以包含在转换后的记录中。
有办法实现吗?
这个 post 有一个类似的问题,但由于我正在尝试使用 XmlIO,所以我还没有找到访问文件元数据的方法。
下面是我在网上找到的方法,不知道有没有办法在上面介绍的pipeline中使用。
p.apply(FileIO.match()
.filepattern(options.getInput())
.continuously(Duration.standardSeconds(5),
Watch.Growth.<String>never()))//File Metadata
.apply(FileIO.readMatches()
.withCompression(Compression.GZIP))//Readable Files
.apply(MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(),new TypeDescriptor<ReadableFile>() {} ))
.via((ReadableFile file) -> {
return KV.of(file.getMetadata().resourceId().getFilename(),file);
})
);
非常感谢任何建议。
感谢您花时间审阅此内容。
编辑:
我采纳了 Alexey 的建议并实现了自定义 XmlIO。如果我们可以扩展我们需要的 class 并覆盖适当的方法,那就太好了。但是,在这种特定情况下,有一个方法引用在 sdk 中受到保护,因此我无法轻易覆盖我需要的方法,而是最终复制了一大堆文件。虽然这目前有效,但我希望将来有一种更直接的方式来访问这些 IO 实现中的文件元数据。
我认为不可能用 XmlIO 的当前实现执行“out-of-box”,因为它 return 是一个 PCollection<T>
,其中 T
是一种类型你的 xml 记录,如果我没记错的话,没有办法在那里添加文件名。尽管如此,您仍然可以尝试“重新实现”ReadFiles
和 XmlSource
以return 解析有效负载和输入文件元数据的方式。
我正在寻找一种方法来访问在 DoFn 中进行数据转换期间正在处理的文件的名称。
我的流水线如下图:
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match()
.filepattern(options.getInput())
.continuously(Duration.standardSeconds(5),
Watch.Growth.<String>never()))
.apply(FileIO.readMatches()
.withCompression(Compression.GZIP))
.apply(XmlIO.<MyString>readFiles()
.withRootElement("root")
.withRecordElement("record")
.withRecordClass(MyString.class))//<-- This only returns the contents of the file
.apply(ParDo.of(new ProcessRecord()))//<-- I need to access file name here
.apply(ParDo.of(new FormatRecord()))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
.apply(new CustomWrite(options));
处理的每个文件都是一个 XML 文档。在处理内容时,我也需要访问正在处理的文件的名称以包含在转换后的记录中。
有办法实现吗?
这个 post 有一个类似的问题,但由于我正在尝试使用 XmlIO,所以我还没有找到访问文件元数据的方法。
下面是我在网上找到的方法,不知道有没有办法在上面介绍的pipeline中使用。
p.apply(FileIO.match()
.filepattern(options.getInput())
.continuously(Duration.standardSeconds(5),
Watch.Growth.<String>never()))//File Metadata
.apply(FileIO.readMatches()
.withCompression(Compression.GZIP))//Readable Files
.apply(MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(),new TypeDescriptor<ReadableFile>() {} ))
.via((ReadableFile file) -> {
return KV.of(file.getMetadata().resourceId().getFilename(),file);
})
);
非常感谢任何建议。 感谢您花时间审阅此内容。
编辑:
我采纳了 Alexey 的建议并实现了自定义 XmlIO。如果我们可以扩展我们需要的 class 并覆盖适当的方法,那就太好了。但是,在这种特定情况下,有一个方法引用在 sdk 中受到保护,因此我无法轻易覆盖我需要的方法,而是最终复制了一大堆文件。虽然这目前有效,但我希望将来有一种更直接的方式来访问这些 IO 实现中的文件元数据。
我认为不可能用 XmlIO 的当前实现执行“out-of-box”,因为它 return 是一个 PCollection<T>
,其中 T
是一种类型你的 xml 记录,如果我没记错的话,没有办法在那里添加文件名。尽管如此,您仍然可以尝试“重新实现”ReadFiles
和 XmlSource
以return 解析有效负载和输入文件元数据的方式。