如何在无界管道中访问 DoFn 中的文件名

How to access file name within a DoFn in an unbounded pipeline

我正在寻找一种方法来访问在 DoFn 中进行数据转换期间正在处理的文件的名称。

我的流水线如下图:

Pipeline p = Pipeline.create(options);

p.apply(FileIO.match()
                .filepattern(options.getInput())
                    .continuously(Duration.standardSeconds(5), 
                            Watch.Growth.<String>never()))
.apply(FileIO.readMatches()
            .withCompression(Compression.GZIP))
.apply(XmlIO.<MyString>readFiles()
            .withRootElement("root")
                .withRecordElement("record")
                    .withRecordClass(MyString.class))//<-- This only returns the contents of the file
.apply(ParDo.of(new ProcessRecord()))//<-- I need to access file name here
.apply(ParDo.of(new FormatRecord()))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
.apply(new CustomWrite(options));

处理的每个文件都是一个 XML 文档。在处理内容时,我也需要访问正在处理的文件的名称以包含在转换后的记录中。

有办法实现吗?

这个 post 有一个类似的问题,但由于我正在尝试使用 XmlIO,所以我还没有找到访问文件元数据的方法。

下面是我在网上找到的方法,不知道有没有办法在上面介绍的pipeline中使用。

p.apply(FileIO.match()
                .filepattern(options.getInput())
                    .continuously(Duration.standardSeconds(5), 
                            Watch.Growth.<String>never()))//File Metadata
.apply(FileIO.readMatches()
            .withCompression(Compression.GZIP))//Readable Files
.apply(MapElements
            .into(TypeDescriptors.kvs(TypeDescriptors.strings(),new TypeDescriptor<ReadableFile>() {} ))
            .via((ReadableFile file) -> {
                return KV.of(file.getMetadata().resourceId().getFilename(),file);
            })
 );         

非常感谢任何建议。 感谢您花时间审阅此内容。

编辑:

我采纳了 Alexey 的建议并实现了自定义 XmlIO。如果我们可以扩展我们需要的 class 并覆盖适当的方法,那就太好了。但是,在这种特定情况下,有一个方法引用在 sdk 中受到保护,因此我无法轻易覆盖我需要的方法,而是最终复制了一大堆文件。虽然这目前有效,但我希望将来有一种更直接的方式来访问这些 IO 实现中的文件元数据。

我认为不可能用 XmlIO 的当前实现执行“out-of-box”,因为它 return 是一个 PCollection<T>,其中 T 是一种类型你的 xml 记录,如果我没记错的话,没有办法在那里添加文件名。尽管如此,您仍然可以尝试“重新实现”ReadFilesXmlSource 以return 解析有效负载和输入文件元数据的方式。