在批处理管道上收到的消息大于最大值
Received message larger than max on a batch processing pipeline
我一直在 google 的云数据流服务上每天 运行 的批处理管道上收到此消息。它开始失败并显示以下消息:
(88b342a0e3852af3): java.io.IOException: INVALID_ARGUMENT: Received message larger than max (21824326 vs. 4194304)
dataflow-batch-jetty-11171129-7ea5-harness-waia talking to localhost:12346 at
com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.close(Native Method) at
com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.close(ChunkingShuffleEntryWriter.java:67) at
com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:286) at
com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:264) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:197) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:149) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
java.lang.Thread.run(Thread.java:745)
我仍在使用旧的解决方法来输出带有 headers 的 CSV 文件,例如
PCollection<String> output = data.apply(ParDo.of(new DoFn<String, String>() {
String new_line = System.getProperty("line.separator");
String csv_header = "id, stuff_1, stuff_2" + new_line;
StringBuilder csv_body = new StringBuilder().append(csv_header);
@Override
public void processElement(ProcessContext c) {
csv_body.append(c.element()).append(newline);
}
@Override
public void finishBundle(Context c) throws Exception {
c.output(csv_body.toString());
}
})).apply(TextIO.Write.named("WriteData").to(options.getOutput()));
这是什么原因造成的?现在这个DoFn的输出是不是太大了?正在处理的数据集的大小没有增加。
这看起来可能是我们这边的一个错误,我们正在调查它,但总的来说,代码可能没有按照您的预期进行。
正如所写,您最终会得到数量不详的输出文件,其名称以给定的前缀开头,每个文件包含您预期的 CSV-like 输出的串联(包括 header s) 对于不同的数据块,以未指定的顺序。
为了正确实现写入 CSV 文件,只需使用 TextIO.Write.withHeader()
指定 header,并完全删除您的 CSV-constructing ParDo。这也不会触发错误。
我一直在 google 的云数据流服务上每天 运行 的批处理管道上收到此消息。它开始失败并显示以下消息:
(88b342a0e3852af3): java.io.IOException: INVALID_ARGUMENT: Received message larger than max (21824326 vs. 4194304)
dataflow-batch-jetty-11171129-7ea5-harness-waia talking to localhost:12346 at
com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.close(Native Method) at
com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.close(ChunkingShuffleEntryWriter.java:67) at
com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:286) at
com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:264) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:197) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:149) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
java.lang.Thread.run(Thread.java:745)
我仍在使用旧的解决方法来输出带有 headers 的 CSV 文件,例如
PCollection<String> output = data.apply(ParDo.of(new DoFn<String, String>() {
String new_line = System.getProperty("line.separator");
String csv_header = "id, stuff_1, stuff_2" + new_line;
StringBuilder csv_body = new StringBuilder().append(csv_header);
@Override
public void processElement(ProcessContext c) {
csv_body.append(c.element()).append(newline);
}
@Override
public void finishBundle(Context c) throws Exception {
c.output(csv_body.toString());
}
})).apply(TextIO.Write.named("WriteData").to(options.getOutput()));
这是什么原因造成的?现在这个DoFn的输出是不是太大了?正在处理的数据集的大小没有增加。
这看起来可能是我们这边的一个错误,我们正在调查它,但总的来说,代码可能没有按照您的预期进行。
正如所写,您最终会得到数量不详的输出文件,其名称以给定的前缀开头,每个文件包含您预期的 CSV-like 输出的串联(包括 header s) 对于不同的数据块,以未指定的顺序。
为了正确实现写入 CSV 文件,只需使用 TextIO.Write.withHeader()
指定 header,并完全删除您的 CSV-constructing ParDo。这也不会触发错误。