使用 ValueProvider 作为 Apache Beam 中的路径提取 zip 内容
Extract zip contents using ValueProvider as path in Apache Beam
我有一个代码可以提取 Google 云存储中 .ZIP 文件的内容。它工作正常,但我需要将此代码与将在运行时提供的文件路径一起使用 ("gs://some_bucket/filename.zip")。当我使用运行时值尝试它时,出现如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySource@187bc24
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:83)
at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:94)
at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:89)
at org.apache.beam.sdk.io.Read.from(Read.java:48)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:535)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:292)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:164)
at BeamTest2.StarterPipeline.main(StarterPipeline.java:180)
Caused by: java.io.NotSerializableException: org.apache.beam.sdk.Pipeline
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 11 more
我使用的代码是:
//Unzip incoming file
PCollection<TableRow> temp = p.apply(BigQueryIO.read().fromQuery(
NestedValueProvider.of(
options.getInputFile(),
new SerializableFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String apply(String filepath) {
try{
List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri(filepath));
LOG.info(gcsPaths+"FilesUnzipped");
List<String> paths = new ArrayList<String>();
for(GcsPath gcsp: gcsPaths){
paths.add(gcsp.toString());
}
p.apply(Create.of(paths))
.apply(ParDo.of(new UnzipFN(filepath)));
}
catch(Exception e)
{
LOG.info("Exception caught while extracting ZIP");
}
return "";
}
})).usingStandardSql().withoutValidation());
解压FN class:
public class UnzipFN extends DoFn<String,Long>{
private long filesUnzipped=0;
@ProcessElement
public void processElement(ProcessContext c){
String p = c.element();
GcsUtilFactory factory = new GcsUtilFactory();
GcsUtil u = factory.create(c.getPipelineOptions());
byte[] buffer = new byte[100000000];
try{
SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
InputStream is = Channels.newInputStream(sek);
BufferedInputStream bis = new BufferedInputStream(is);
ZipInputStream zis = new ZipInputStream(bis);
ZipEntry ze = zis.getNextEntry();
while(ze!=null){
LOG.info("Unzipping File {}",ze.getName());
WritableByteChannel wri = u.create(GcsPath.fromUri("gs://bucket_location/" + ze.getName()), getType(ze.getName()));
OutputStream os = Channels.newOutputStream(wri);
int len;
while((len=zis.read(buffer))>0){
os.write(buffer,0,len);
}
os.close();
filesUnzipped++;
ze=zis.getNextEntry();
}
zis.closeEntry();
zis.close();
}
catch(Exception e){
e.printStackTrace();
}
c.output(filesUnzipped);
System.out.println(filesUnzipped+"FilesUnzipped");
LOG.info("FilesUnzipped");
}
private String getType(String fName){
if(fName.endsWith(".zip")){
return "application/x-zip-compressed";
}
else {
return "text/plain";
}
}
}
如何处理这种情况?
P.S。 - .zip 提取代码与 BigQueryIO.read() 无关。我只是将它用作能够读取运行时值的 hack。如果您对此有任何其他建议,请告诉我。
谢谢。
如果我没理解错的话,你有一个包含文件模式的 ValueProvider<String>
,你正在使用 GcsUtil.expand()
扩展文件模式,并且你想应用一个函数 (UnzipFn
)每个生成的文件名。
由于以下几个原因,当前代码将无法运行:
- 您正在创建一个
BigQueryIO.read().fromQuery()
,其中 fromQuery()
的参数是一个 ValueProvider
,它总是 returns 空字符串(您的 NestedValueProvider
,之后做一堆事情,总是 returns 空字符串 ""
)。这将在 运行 时失败,因为查询不能为空。使用 BigQueryIO
作为黑客尝试访问 ValueProvider
不是一个好主意 - 见下文。
- 您正在向函数内的管道添加步骤以从
ValueProvider
中提取值。该函数在管道 运行ning 时从工作人员调用,以获取提供者的 运行 时间值。当管道 运行ning. 时,无法从其工作人员向管道添加步骤
- 您正在将
Pipeline
对象捕获到 SerializableFunction
闭包中,但它无法序列化,因为 Pipeline
不是 Serializable
- 因为没有合法的序列化 Pipeline
Java 对象的用例:它永远不需要发送给工作人员或 运行ner,它只是一个临时构建器对象,用于在您的主程序中构建一些东西您可以稍后调用 .run()
。另一方面,SerializableFunction
将运送给工人,以便他们评估ValueProvider
的当前价值。
将 ValueProvider
视为占位符,它仅在管道 运行 时有值,但在构建时没有值 - 例如你可以从 DoFn
中调用 provider.get()
。 NestedValueProvider
根本没有改变这一点——它只是用一些简单的转换逻辑包装另一个 ValueProvider
,当你有一个 ValueProvider<Something>
但需要它作为 ValueProvider<SomethingSlightlyDifferent>
.
问题的症结在于您正在尝试使用仅在 运行 时间(您的 options.getInputFile()
ValueProvider
)可用的值来执行构建时的操作 - 创建流水线步骤 Create.of(paths)
。逻辑上不可能在构造时规避 ValueProvider
的不可用性:ValueProvider
存在专门用于表示构造时 尚未提供的值 ,因此它们在管道描述中保留为占位符,并且仅当管道为 运行 时才作为参数提供。你需要想出一个管道结构,其中输入文件是一个占位符,管道以你想要的方式处理它。
你可以这样做:
p.apply(Create.ofProvider(options.getInputFile(), StringUtf8Coder.of()))
.apply(ParDo.of(new ExpandFn()))
.apply(...fusion break...)
.apply(ParDo.of(new UnzipFn()))
其中 ExpandFn
将是一个 DoFn
,它接受一个 String
并执行您的 GcsUtil.expand()
操作,对于融合中断,请参见例如JdbcIO.java
.
的实施
在 Beam 2.2 中(您目前可以在 HEAD 中使用它)您不需要 ExpandFn
- 已经存在可以扩展文件模式等的转换(例如,它可以逐步扩展文件模式和在流媒体管道中继续观察匹配它的新文件)。所以你可以写得更简洁:
p.apply(FileIO.match().filepattern(options.getInputFile()))
.apply(...fusion break...)
.apply(ParDo.of(new UnzipFn()));
我有一个代码可以提取 Google 云存储中 .ZIP 文件的内容。它工作正常,但我需要将此代码与将在运行时提供的文件路径一起使用 ("gs://some_bucket/filename.zip")。当我使用运行时值尝试它时,出现如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySource@187bc24
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:83)
at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:94)
at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:89)
at org.apache.beam.sdk.io.Read.from(Read.java:48)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:535)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:292)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:164)
at BeamTest2.StarterPipeline.main(StarterPipeline.java:180)
Caused by: java.io.NotSerializableException: org.apache.beam.sdk.Pipeline
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 11 more
我使用的代码是:
//Unzip incoming file
PCollection<TableRow> temp = p.apply(BigQueryIO.read().fromQuery(
NestedValueProvider.of(
options.getInputFile(),
new SerializableFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String apply(String filepath) {
try{
List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri(filepath));
LOG.info(gcsPaths+"FilesUnzipped");
List<String> paths = new ArrayList<String>();
for(GcsPath gcsp: gcsPaths){
paths.add(gcsp.toString());
}
p.apply(Create.of(paths))
.apply(ParDo.of(new UnzipFN(filepath)));
}
catch(Exception e)
{
LOG.info("Exception caught while extracting ZIP");
}
return "";
}
})).usingStandardSql().withoutValidation());
解压FN class:
public class UnzipFN extends DoFn<String,Long>{
private long filesUnzipped=0;
@ProcessElement
public void processElement(ProcessContext c){
String p = c.element();
GcsUtilFactory factory = new GcsUtilFactory();
GcsUtil u = factory.create(c.getPipelineOptions());
byte[] buffer = new byte[100000000];
try{
SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
InputStream is = Channels.newInputStream(sek);
BufferedInputStream bis = new BufferedInputStream(is);
ZipInputStream zis = new ZipInputStream(bis);
ZipEntry ze = zis.getNextEntry();
while(ze!=null){
LOG.info("Unzipping File {}",ze.getName());
WritableByteChannel wri = u.create(GcsPath.fromUri("gs://bucket_location/" + ze.getName()), getType(ze.getName()));
OutputStream os = Channels.newOutputStream(wri);
int len;
while((len=zis.read(buffer))>0){
os.write(buffer,0,len);
}
os.close();
filesUnzipped++;
ze=zis.getNextEntry();
}
zis.closeEntry();
zis.close();
}
catch(Exception e){
e.printStackTrace();
}
c.output(filesUnzipped);
System.out.println(filesUnzipped+"FilesUnzipped");
LOG.info("FilesUnzipped");
}
private String getType(String fName){
if(fName.endsWith(".zip")){
return "application/x-zip-compressed";
}
else {
return "text/plain";
}
}
}
如何处理这种情况?
P.S。 - .zip 提取代码与 BigQueryIO.read() 无关。我只是将它用作能够读取运行时值的 hack。如果您对此有任何其他建议,请告诉我。
谢谢。
如果我没理解错的话,你有一个包含文件模式的 ValueProvider<String>
,你正在使用 GcsUtil.expand()
扩展文件模式,并且你想应用一个函数 (UnzipFn
)每个生成的文件名。
由于以下几个原因,当前代码将无法运行:
- 您正在创建一个
BigQueryIO.read().fromQuery()
,其中fromQuery()
的参数是一个ValueProvider
,它总是 returns 空字符串(您的NestedValueProvider
,之后做一堆事情,总是 returns 空字符串""
)。这将在 运行 时失败,因为查询不能为空。使用BigQueryIO
作为黑客尝试访问ValueProvider
不是一个好主意 - 见下文。 - 您正在向函数内的管道添加步骤以从
ValueProvider
中提取值。该函数在管道 运行ning 时从工作人员调用,以获取提供者的 运行 时间值。当管道 运行ning. 时,无法从其工作人员向管道添加步骤
- 您正在将
Pipeline
对象捕获到SerializableFunction
闭包中,但它无法序列化,因为Pipeline
不是Serializable
- 因为没有合法的序列化Pipeline
Java 对象的用例:它永远不需要发送给工作人员或 运行ner,它只是一个临时构建器对象,用于在您的主程序中构建一些东西您可以稍后调用.run()
。另一方面,SerializableFunction
将运送给工人,以便他们评估ValueProvider
的当前价值。
将 ValueProvider
视为占位符,它仅在管道 运行 时有值,但在构建时没有值 - 例如你可以从 DoFn
中调用 provider.get()
。 NestedValueProvider
根本没有改变这一点——它只是用一些简单的转换逻辑包装另一个 ValueProvider
,当你有一个 ValueProvider<Something>
但需要它作为 ValueProvider<SomethingSlightlyDifferent>
.
问题的症结在于您正在尝试使用仅在 运行 时间(您的 options.getInputFile()
ValueProvider
)可用的值来执行构建时的操作 - 创建流水线步骤 Create.of(paths)
。逻辑上不可能在构造时规避 ValueProvider
的不可用性:ValueProvider
存在专门用于表示构造时 尚未提供的值 ,因此它们在管道描述中保留为占位符,并且仅当管道为 运行 时才作为参数提供。你需要想出一个管道结构,其中输入文件是一个占位符,管道以你想要的方式处理它。
你可以这样做:
p.apply(Create.ofProvider(options.getInputFile(), StringUtf8Coder.of()))
.apply(ParDo.of(new ExpandFn()))
.apply(...fusion break...)
.apply(ParDo.of(new UnzipFn()))
其中 ExpandFn
将是一个 DoFn
,它接受一个 String
并执行您的 GcsUtil.expand()
操作,对于融合中断,请参见例如JdbcIO.java
.
在 Beam 2.2 中(您目前可以在 HEAD 中使用它)您不需要 ExpandFn
- 已经存在可以扩展文件模式等的转换(例如,它可以逐步扩展文件模式和在流媒体管道中继续观察匹配它的新文件)。所以你可以写得更简洁:
p.apply(FileIO.match().filepattern(options.getInputFile()))
.apply(...fusion break...)
.apply(ParDo.of(new UnzipFn()));