将多部分空 csv 文件从 Apache Beam 写入 netApp 存储网格时出现异常
Exception while writing multipart empty csv file from Apache Beam into netApp Storage Grid
问题陈述
我们正在将多个 csv 文件消耗到 pcollections -> 应用 beam SQL 来转换数据 -> 写入结果 pcollection。
如果我们在所有源 pCollections 中都有一些数据并且 beam SQL 生成包含一些数据的新集合,那么这绝对可以正常工作。
当 Transform pCollection 生成空 pCollection 并且在 netApp 存储网格中写入它时,它会在下面抛出,
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at ECSOperations.main(ECSOperations.java:53)
Caused by: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1076)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.createMissingEmptyShards(FileBasedSink.java:759)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalizeDestination(FileBasedSink.java:639)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:1040)
Caused by: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Your proposed upload is smaller than the minimum allowed object size. (Service: Amazon S3; Status Code: 400; Error Code: EntityTooSmall; Request ID: 1643869619144605; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null
示例代码如下
ECSOptions options = PipelineOptionsFactory.fromArgs(args).as(ECSOptions.class);
setupConfiguration(options);
Pipeline p = Pipeline.create(options);
PCollection<String> pSource= p.apply(TextIO.read().from("src/main/resources/empty.csv"));
pSource.apply(TextIO.write().to("s3://bucket-name/empty.csv").withoutSharding());
p.run();
观察
- 如果我们写简单文件而不是多部分文件(简单地将对象放入存储网格)就可以正常工作
- 似乎是 known issue with Storage Grid 但我们想检查一下我们是否可以从光束管道处理这个问题。
我试过的
- 试着看看我是否可以在写入之前检查 PCollection 的大小并将一些字符串放入输出文件,但是由于 PCollection 是空的,所以它根本不会进入 PTransform。
- 也尝试过 Count.globally,但那个事件没有帮助
询问
- 我们是否可以在 Beam 中处理这个问题,就像我们可以在写入之前检查 PCollection 的大小一样?如果大小为零,即空 pcollection,那么我们可以避免写入文件来避免这个问题。
- 有没有人遇到过类似的问题并且能够解决?
您无法在管道构造期间检查 PCollection 是否为空,因为它尚未计算。如果此文件系统不支持空文件,您可以尝试写入另一个文件系统,然后复制 iff 文件不为空(假设有问题的文件不是太大)。
我想到了另外两个选项:
- TextIO.write().withFooter(...) 始终在文件末尾写一个空行(或 space 或其他)以确保它不为空。
- 如果给定的 PCollection 为空,您可以使用具有单个空行的 PCollection 展平您的 PCollection。 (这比较复杂,但可以更普遍地使用。) 具体来说
PCollection<String> pcollToWrite = ...
// This will count the number of elements in toWriteSize at runtime.
PCollectionView<Long> toWriteSize = pcollToWrite.apply(Count.globally().asSingletonView());
PCollection<String> emptyOrSingletonPCollection =
p
// Creates a PCollection with a single element.
.apply(Create.of(Collections.singletonList(""))
// Applies a ParDo that will emit this single element if and only if
// toWriteSize is zero.
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(@Element String mainElement, OutputReceiver<String> out, ProcessContext c) {
if (c.sideInput(toWriteSize) == 0) {
out.output("");
}
}
}).withSideInputs(toWriteSize));
// We now flatten pcollToWrite and emptyOrSingletonPCollection together.
// The right hand side has exactly one element whenever the left hand side
// is empty, so there will always be at least one element.
PCollectionList.of(pcollToWrite, emptyOrSingletonPCollection)
.apply(Flatten.pCollections())
.apply(TextIO.write(...))
问题陈述
我们正在将多个 csv 文件消耗到 pcollections -> 应用 beam SQL 来转换数据 -> 写入结果 pcollection。 如果我们在所有源 pCollections 中都有一些数据并且 beam SQL 生成包含一些数据的新集合,那么这绝对可以正常工作。 当 Transform pCollection 生成空 pCollection 并且在 netApp 存储网格中写入它时,它会在下面抛出,
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at ECSOperations.main(ECSOperations.java:53)
Caused by: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1076)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.createMissingEmptyShards(FileBasedSink.java:759)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalizeDestination(FileBasedSink.java:639)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:1040)
Caused by: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Your proposed upload is smaller than the minimum allowed object size. (Service: Amazon S3; Status Code: 400; Error Code: EntityTooSmall; Request ID: 1643869619144605; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null
示例代码如下
ECSOptions options = PipelineOptionsFactory.fromArgs(args).as(ECSOptions.class);
setupConfiguration(options);
Pipeline p = Pipeline.create(options);
PCollection<String> pSource= p.apply(TextIO.read().from("src/main/resources/empty.csv"));
pSource.apply(TextIO.write().to("s3://bucket-name/empty.csv").withoutSharding());
p.run();
观察
- 如果我们写简单文件而不是多部分文件(简单地将对象放入存储网格)就可以正常工作
- 似乎是 known issue with Storage Grid 但我们想检查一下我们是否可以从光束管道处理这个问题。
我试过的
- 试着看看我是否可以在写入之前检查 PCollection 的大小并将一些字符串放入输出文件,但是由于 PCollection 是空的,所以它根本不会进入 PTransform。
- 也尝试过 Count.globally,但那个事件没有帮助
询问
- 我们是否可以在 Beam 中处理这个问题,就像我们可以在写入之前检查 PCollection 的大小一样?如果大小为零,即空 pcollection,那么我们可以避免写入文件来避免这个问题。
- 有没有人遇到过类似的问题并且能够解决?
您无法在管道构造期间检查 PCollection 是否为空,因为它尚未计算。如果此文件系统不支持空文件,您可以尝试写入另一个文件系统,然后复制 iff 文件不为空(假设有问题的文件不是太大)。
我想到了另外两个选项:
- TextIO.write().withFooter(...) 始终在文件末尾写一个空行(或 space 或其他)以确保它不为空。
- 如果给定的 PCollection 为空,您可以使用具有单个空行的 PCollection 展平您的 PCollection。 (这比较复杂,但可以更普遍地使用。) 具体来说
PCollection<String> pcollToWrite = ...
// This will count the number of elements in toWriteSize at runtime.
PCollectionView<Long> toWriteSize = pcollToWrite.apply(Count.globally().asSingletonView());
PCollection<String> emptyOrSingletonPCollection =
p
// Creates a PCollection with a single element.
.apply(Create.of(Collections.singletonList(""))
// Applies a ParDo that will emit this single element if and only if
// toWriteSize is zero.
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(@Element String mainElement, OutputReceiver<String> out, ProcessContext c) {
if (c.sideInput(toWriteSize) == 0) {
out.output("");
}
}
}).withSideInputs(toWriteSize));
// We now flatten pcollToWrite and emptyOrSingletonPCollection together.
// The right hand side has exactly one element whenever the left hand side
// is empty, so there will always be at least one element.
PCollectionList.of(pcollToWrite, emptyOrSingletonPCollection)
.apply(Flatten.pCollections())
.apply(TextIO.write(...))