Google DataFlow:将文件名附加到邮件

Google DataFlow: attaching filename to the message

我正在尝试构建 Google DataFlow 管道,它包含以下步骤:

我的问题是我无法将文件名添加到最终输出消息中。 当前实施:

ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);

Pipeline p = Pipeline.create(options);

p.apply("ReadFromTopic", PubsubIO.readMessages().fromTopic(options.getInputTopic()))
    .apply("CollectFiles", ParDo.of(new DoFn<PubsubMessage, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String fileName = new String(c.element().getPayload());
            c.output("gs://bucket-name/" + fileName);
        }
    }))
    .apply("ReadLines", TextIO.readAll())
    .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));

p.run().waitUntilFinish();

我在 here 之前看到过类似的问题,但这对我来说并不是一个真正有效的解决方案,因为我必须将文件名附加到每个输出消息,而不仅仅是对每一行进行解析。 谁能告诉我可能的解决方案?

更新

谢谢@jkff,我听从了你的建议和我目前的解决方案代码:

ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
    Pipeline p = Pipeline.create(options);
    p.apply("ReadFromTopic", PubsubIO.readMessages().fromSubscription(options.getInputSubscription()))
            .apply("PrintMessages", ParDo.of(new DoFn<PubsubMessage, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String message = new String(c.element().getPayload());
                    c.output("gs://bucket/" + message);
                }
            }))
            .apply(FileIO.matchAll())
            .apply(FileIO.readMatches())
            .apply("ReadFile", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws IOException {
                    FileIO.ReadableFile f = c.element();

                    String filePath = f.getMetadata().resourceId().toString();
                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);

                    ReadableByteChannel inChannel = f.open();
                    ByteBuffer buffer = ByteBuffer.allocate(1);
                    StringBuffer line = new StringBuffer();
                    while (inChannel.read(buffer) > 0) {
                        buffer.flip();
                        for (int i = 0; i < buffer.limit(); i++) {
                            char ch = ((char) buffer.get());
                            if (ch == '\r') {
                                c.output(line.toString() + " " + fileName);
                                line = new StringBuffer();
                            } else {
                                line.append(ch);
                            }
                        }
                        buffer.clear();
                    }
                    inChannel.close();
                }
            }))
            .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
    p.run().waitUntilFinish();

您可以使用 FileIO - 使用 FileIO.matchAll() 后跟 FileIO.readMatches() 得到一个 PCollection<ReadableFile>,其中每个 ReadableFile 都可以用来获取文件名并读取文件。跟着一个 DoFn 做你想做的事。要阅读该文件,请使用 ReadableFile.open() 上的标准 Java 图书馆设施。