使用 ValueProvider 作为 Apache Beam 中的路径提取 zip 内容

Extract zip contents using ValueProvider as path in Apache Beam

我有一个代码可以提取 Google 云存储中 .ZIP 文件的内容。它工作正常,但我需要将此代码与将在运行时提供的文件路径一起使用 ("gs://some_bucket/filename.zip")。当我使用运行时值尝试它时,出现如下错误:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySource@187bc24
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:83)
    at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:94)
    at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:89)
    at org.apache.beam.sdk.io.Read.from(Read.java:48)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:535)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:292)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
    at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:164)
    at BeamTest2.StarterPipeline.main(StarterPipeline.java:180)
Caused by: java.io.NotSerializableException: org.apache.beam.sdk.Pipeline
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
    ... 11 more

我使用的代码是:

 //Unzip incoming file
      PCollection<TableRow> temp = p.apply(BigQueryIO.read().fromQuery(
      NestedValueProvider.of(
          options.getInputFile(),
          new SerializableFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public String apply(String filepath) {
                try{

                 List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri(filepath));
                 LOG.info(gcsPaths+"FilesUnzipped");
                  List<String> paths = new ArrayList<String>();

                  for(GcsPath gcsp: gcsPaths){
                      paths.add(gcsp.toString());
                  }
                  p.apply(Create.of(paths))
                      .apply(ParDo.of(new UnzipFN(filepath)));

                }
                catch(Exception e)
                {
                    LOG.info("Exception caught while extracting ZIP");
                }
                return "";
            }
          })).usingStandardSql().withoutValidation());

解压FN class:

public class UnzipFN extends DoFn<String,Long>{
    private long filesUnzipped=0;
    @ProcessElement
    public void processElement(ProcessContext c){
        String p = c.element();
        GcsUtilFactory factory = new GcsUtilFactory();
        GcsUtil u = factory.create(c.getPipelineOptions());
        byte[] buffer = new byte[100000000];
        try{
            SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
            InputStream is = Channels.newInputStream(sek);
            BufferedInputStream bis = new BufferedInputStream(is);
            ZipInputStream zis = new ZipInputStream(bis);
            ZipEntry ze = zis.getNextEntry();
            while(ze!=null){
                LOG.info("Unzipping File {}",ze.getName());
                WritableByteChannel wri = u.create(GcsPath.fromUri("gs://bucket_location/" + ze.getName()), getType(ze.getName()));
                OutputStream os = Channels.newOutputStream(wri);
                int len;
                while((len=zis.read(buffer))>0){
                    os.write(buffer,0,len);
                }
                os.close();
                filesUnzipped++;
                ze=zis.getNextEntry();


            }
            zis.closeEntry();
            zis.close();

        }
        catch(Exception e){
            e.printStackTrace();
        }
    c.output(filesUnzipped);
    System.out.println(filesUnzipped+"FilesUnzipped");
    LOG.info("FilesUnzipped");
    }

    private String getType(String fName){
        if(fName.endsWith(".zip")){
            return "application/x-zip-compressed";
        }
        else {
            return "text/plain";
        }
    }
}

如何处理这种情况?

P.S。 - .zip 提取代码与 BigQueryIO.read() 无关。我只是将它用作能够读取运行时值的 hack。如果您对此有任何其他建议,请告诉我。

谢谢。

如果我没理解错的话,你有一个包含文件模式的 ValueProvider<String>,你正在使用 GcsUtil.expand() 扩展文件模式,并且你想应用一个函数 (UnzipFn)每个生成的文件名。

由于以下几个原因,当前代码将无法运行:

  • 您正在创建一个 BigQueryIO.read().fromQuery(),其中 fromQuery() 的参数是一个 ValueProvider,它总是 returns 空字符串(您的 NestedValueProvider,之后做一堆事情,总是 returns 空字符串 "")。这将在 运行 时失败,因为查询不能为空。使用 BigQueryIO 作为黑客尝试访问 ValueProvider 不是一个好主意 - 见下文。
  • 您正在向函数内的管道添加步骤以从 ValueProvider 中提取值。该函数在管道 运行ning 时从工作人员调用,以获取提供者的 运行 时间值。当管道 运行ning.
  • 时,无法从其工作人员向管道添加步骤
  • 您正在将 Pipeline 对象捕获到 SerializableFunction 闭包中,但它无法序列化,因为 Pipeline 不是 Serializable - 因为没有合法的序列化 Pipeline Java 对象的用例:它永远不需要发送给工作人员或 运行ner,它只是一个临时构建器对象,用于在您的主程序中构建一些东西您可以稍后调用 .run()。另一方面,SerializableFunction运送给工人,以便他们评估ValueProvider的当前价值。

ValueProvider 视为占位符,它仅在管道 运行 时有值,但在构建时没有值 - 例如你可以从 DoFn 中调用 provider.get()NestedValueProvider 根本没有改变这一点——它只是用一些简单的转换逻辑包装另一个 ValueProvider,当你有一个 ValueProvider<Something> 但需要它作为 ValueProvider<SomethingSlightlyDifferent>.

问题的症结在于您正在尝试使用仅在 运行 时间(您的 options.getInputFile() ValueProvider)可用的值来执行构建时的操作 - 创建流水线步骤 Create.of(paths)。逻辑上不可能在构造时规避 ValueProvider 的不可用性:ValueProvider 存在专门用于表示构造时 尚未提供的值 ,因此它们在管道描述中保留为占位符,并且仅当管道为 运行 时才作为参数提供。你需要想出一个管道结构,其中输入文件是一个占位符,管道以你想要的方式处理它。

你可以这样做:

p.apply(Create.ofProvider(options.getInputFile(), StringUtf8Coder.of()))
 .apply(ParDo.of(new ExpandFn()))
 .apply(...fusion break...)
 .apply(ParDo.of(new UnzipFn()))

其中 ExpandFn 将是一个 DoFn,它接受一个 String 并执行您的 GcsUtil.expand() 操作,对于融合中断,请参见例如JdbcIO.java.

的实施

在 Beam 2.2 中(您目前可以在 HEAD 中使用它)您不需要 ExpandFn - 已经存在可以扩展文件模式等的转换(例如,它可以逐步扩展文件模式和在流媒体管道中继续观察匹配它的新文件)。所以你可以写得更简洁:

p.apply(FileIO.match().filepattern(options.getInputFile()))
 .apply(...fusion break...)
 .apply(ParDo.of(new UnzipFn()));