简单的 Apache Beam 操作工作起来非常慢
Simple Apache Beam manipulations work very slow
我是 Apache Beam 的新手,我的 Java 技能很低,但我想了解为什么我的简单条目操作在 Apache Beam 中运行如此缓慢。
我要执行的操作如下:我有一个 CSV 文件,其中包含以下方案的 100 万条记录(Alexa 前 100 万个站点):NUMBER,DOMAIN
(例如 1,google.com
),我想“剥离”第一个(数字)字段并仅获取域部分。此管道的代码如下:
package misc.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class Example {
static class ExtractDomainsFn extends DoFn<String, String> {
private final Counter domains = Metrics.counter(ExtractDomainsFn.class, "domains");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
domains.inc();
String domain = c.element().split(",")[1];
c.output(domain);
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create();
p.apply("ReadLines", TextIO.read().from("./top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("domains"));
p.run().waitUntilFinish();
}
}
当我使用 Maven 执行此代码时,在我的笔记本电脑上需要四分多钟才能成功:
$ mvn compile exec:java -Dexec.mainClass=misc.examples.Example
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building my-example 1.0.0
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ my-example ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /…/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ my-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ my-example ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:36 min
[INFO] Finished at: 2017-06-24T15:20:33+03:00
[INFO] Final Memory: 31M/1685M
[INFO] ------------------------------------------------------------------------
虽然简单的 cut(1)
在您眨眼之前就可以工作:
$time cut -d, -f2 top-1m.csv > domains
real 0m0.171s
user 0m0.140s
sys 0m0.028s
那么,这样的 Apache Beam 行为是否被认为是可以接受的(可能它在大量数据上工作得更好)还是我的代码效率低下?
01-07-2014 更新:
作为 Kenn Knowles ,我曾尝试 运行 其他 运行 管道而不是 DirectRunner
— DataflowRunner
。所以更新后的代码如下所示:
package misc.examples;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class Example {
static class ExtractDomainsFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
String domain = c.element().split(",")[1];
c.output(domain);
}
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject("my-gcp-project-id");
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from("gs://my-gcs-bucket/top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("gs://my-gcs-bucket/output/"));
p.run().waitUntilFinish();
}
}
已用时间 运行在 Google 上的数据流与直接 运行 相比较小,但仍然足够慢 — 比 3 分钟多一点:
Apache Beam 在 Apache Flink、Apache Spark、Apache Apex 和 Google Cloud Dataflow 等大规模数据处理引擎上提供正确的事件时间处理和可移植性。
在这里,看起来你是 运行 你的默认管道 DirectRunner
这是一种小规模测试管道正确性的方法(其中 "small" 意味着任何不使用多台机器)。为了测试正确性,runner 还执行额外的任务来帮助确保正确性,例如检查序列化 (Coder
) 和将元素按随机顺序放置以确保您的管道不依赖于顺序。
DirectRunner
并非 必须一次将所有值带入内存,但具有流式执行模型,因此它也适用于无限数据集和触发。与简单循环相比,这也增加了开销。
也就是说,四分钟很慢,我提交了 BEAM-2516 跟进。
您也可以在其他后端尝试 运行,特别是 SparkRunner
、FlinkRunner
和 ApexRunner
支持在您的笔记本电脑上嵌入执行。
回复2017-07-01更新:
虽然您在 Cloud Dataflow 上体验的总 运行 时间约为 3 分钟,但处理数据的实际时间约为 1 分钟。您可以在日志中看到这一点。其余的是启动和关闭工作虚拟机。我们一直在努力减少这种开销。为什么需要大约 1 分钟?您必须进行剖析才能找出答案(我很想听听结果!)但 Dataflow 肯定比 cut
做得更多:从 GCS 读取和写入,提供持久性和容错性,并且在TextIO
写入步骤它正在执行数据的网络随机播放,以便并行写入分片文件。如果 Dataflow 注意到您的计算没有并行性并且足够小以至于不需要它,那么显然可以优化一些东西。
但请记住,Beam 和 Cloud Dataflow 的存在是为了帮助您使用并行 处理无法在单台机器上及时处理的大量数据。因此,处理没有可用并行性的微小示例不是目标。
次要顺序计算通常作为大型管道的一小部分发生,但在实际物理计划的背景下,小型辅助计算通常不会影响端到端时间。 VM 管理的开销也是一次性成本,因此它们更有可能根据数十到数百台机器上的数分钟到数小时的计算来衡量。
我是 Apache Beam 的新手,我的 Java 技能很低,但我想了解为什么我的简单条目操作在 Apache Beam 中运行如此缓慢。
我要执行的操作如下:我有一个 CSV 文件,其中包含以下方案的 100 万条记录(Alexa 前 100 万个站点):NUMBER,DOMAIN
(例如 1,google.com
),我想“剥离”第一个(数字)字段并仅获取域部分。此管道的代码如下:
package misc.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class Example {
static class ExtractDomainsFn extends DoFn<String, String> {
private final Counter domains = Metrics.counter(ExtractDomainsFn.class, "domains");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
domains.inc();
String domain = c.element().split(",")[1];
c.output(domain);
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create();
p.apply("ReadLines", TextIO.read().from("./top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("domains"));
p.run().waitUntilFinish();
}
}
当我使用 Maven 执行此代码时,在我的笔记本电脑上需要四分多钟才能成功:
$ mvn compile exec:java -Dexec.mainClass=misc.examples.Example
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building my-example 1.0.0
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ my-example ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /…/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ my-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ my-example ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:36 min
[INFO] Finished at: 2017-06-24T15:20:33+03:00
[INFO] Final Memory: 31M/1685M
[INFO] ------------------------------------------------------------------------
虽然简单的 cut(1)
在您眨眼之前就可以工作:
$time cut -d, -f2 top-1m.csv > domains
real 0m0.171s
user 0m0.140s
sys 0m0.028s
那么,这样的 Apache Beam 行为是否被认为是可以接受的(可能它在大量数据上工作得更好)还是我的代码效率低下?
01-07-2014 更新:
作为 Kenn Knowles DirectRunner
— DataflowRunner
。所以更新后的代码如下所示:
package misc.examples;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class Example {
static class ExtractDomainsFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
String domain = c.element().split(",")[1];
c.output(domain);
}
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject("my-gcp-project-id");
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from("gs://my-gcs-bucket/top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("gs://my-gcs-bucket/output/"));
p.run().waitUntilFinish();
}
}
已用时间 运行在 Google 上的数据流与直接 运行 相比较小,但仍然足够慢 — 比 3 分钟多一点:
Apache Beam 在 Apache Flink、Apache Spark、Apache Apex 和 Google Cloud Dataflow 等大规模数据处理引擎上提供正确的事件时间处理和可移植性。
在这里,看起来你是 运行 你的默认管道 DirectRunner
这是一种小规模测试管道正确性的方法(其中 "small" 意味着任何不使用多台机器)。为了测试正确性,runner 还执行额外的任务来帮助确保正确性,例如检查序列化 (Coder
) 和将元素按随机顺序放置以确保您的管道不依赖于顺序。
DirectRunner
并非 必须一次将所有值带入内存,但具有流式执行模型,因此它也适用于无限数据集和触发。与简单循环相比,这也增加了开销。
也就是说,四分钟很慢,我提交了 BEAM-2516 跟进。
您也可以在其他后端尝试 运行,特别是 SparkRunner
、FlinkRunner
和 ApexRunner
支持在您的笔记本电脑上嵌入执行。
回复2017-07-01更新:
虽然您在 Cloud Dataflow 上体验的总 运行 时间约为 3 分钟,但处理数据的实际时间约为 1 分钟。您可以在日志中看到这一点。其余的是启动和关闭工作虚拟机。我们一直在努力减少这种开销。为什么需要大约 1 分钟?您必须进行剖析才能找出答案(我很想听听结果!)但 Dataflow 肯定比 cut
做得更多:从 GCS 读取和写入,提供持久性和容错性,并且在TextIO
写入步骤它正在执行数据的网络随机播放,以便并行写入分片文件。如果 Dataflow 注意到您的计算没有并行性并且足够小以至于不需要它,那么显然可以优化一些东西。
但请记住,Beam 和 Cloud Dataflow 的存在是为了帮助您使用并行 处理无法在单台机器上及时处理的大量数据。因此,处理没有可用并行性的微小示例不是目标。
次要顺序计算通常作为大型管道的一小部分发生,但在实际物理计划的背景下,小型辅助计算通常不会影响端到端时间。 VM 管理的开销也是一次性成本,因此它们更有可能根据数十到数百台机器上的数分钟到数小时的计算来衡量。