Apache Spark 以全分布式模式对执行器执行操作
Apache Spark take Action on Executors in fully distributed mode
我是 spark 的新手,我对转换和操作的工作原理有基本的了解 ()。我正在尝试对文本文件中的每一行(基本上是段落)进行一些 NLP 操作。处理后,结果应发送到服务器(REST Api)进行存储。该程序 运行 作为 spark 作业(使用 spark-submit 提交)在 yarn
模式的 10 个节点的集群上。这是我到目前为止所做的。
...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
.map(line -> {
// processed here
return result;
});
processedLines.foreach(line -> {
// Send to server
});
这有效,但 foreach
循环似乎是顺序的,它似乎不是 运行 在工作节点上的分布式模式下。我说的对吗?
我尝试了以下代码,但它不起作用。错误:java: incompatible types: inferred type does not conform to upper bound(s)
。显然这是错误的,因为 map
是一个转换,而不是一个动作。
lines.map(line -> { /* processing */ })
.map(line -> { /* Send to server */ });
我也试过 take()
,但它需要 int
并且 processedLines.count()
是 long
.
类型
processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });
数据量很大(大于100gb)。我想要的是处理和将其发送到服务器都应该在工作节点上完成。 map
中的处理部分挑衅地发生在工作节点上。但是我如何将处理后的数据从工作节点发送到服务器,因为 foreach
似乎是驱动程序中发生的顺序循环(如果我是正确的)。简单的说,如何在worker节点执行action
而不是在driver程序中执行
非常感谢任何帮助。
foreach
是spark中的一个动作。它基本上采用 RDD 的每个元素并将函数应用于该元素。
foreach
在executor节点或worker节点上执行。它不会 应用于驱动程序节点。请注意,在 运行 spark 的本地执行模式中,驱动程序和执行程序节点可以驻留在同一 JVM 上。
检查此以供参考foreach explanation
您尝试映射 RDD 的每个元素然后将 foreach
应用于每个元素的方法看起来不错。我能想到为什么要花时间的原因是你正在处理的数据大小(~100GB)。
对此进行优化的一种方法是 repartition
输入数据集。理想情况下,每个分区的大小应为 128MB,以获得更好的性能结果。您会找到许多关于对数据进行重新分区的最佳实践的文章。我建议您遵循它们,它会带来一些性能优势。
您可以想到的第二个优化是分配给每个执行程序节点的内存。它在进行火花调谐时起着非常重要的作用。
你能想到的第三个优化是,批量对服务器进行网络调用。您当前正在为 RDD 的每个元素对服务器进行网络调用。如果您的设计允许您对这些网络调用进行批处理,那么您可以在单个网络调用中发送超过 1 个元素。如果产生的延迟主要是由于这些网络调用,这也可能有帮助。
希望对您有所帮助。
首先,当您的代码 运行 在 Executors 上时,它现在已经处于分布式模式,当您想要利用 Executors 上的所有 CPU 资源以获得更多并行性时,您应该选择一些 async
选项,最好使用批处理模式操作,以避免如下所示过度创建客户端连接对象。
您可以将代码替换为
processedLines.foreach(line -> {
使用任一解决方案
processedLines.foreachAsync(line -> {
// Send to server
}).get();
//To iterate batch wise I would go for this
processedLines.foreachPartitionAsync(lineIterator -> {
// Create your ouput client connection here
while (lineIterator.hasNext()){
String line = lineIterator.next();
}
}).get();
这两个函数都将创建一个 Future 对象或提交一个新线程或一个解除阻塞的调用,这将自动为您的代码添加并行性。
我是 spark 的新手,我对转换和操作的工作原理有基本的了解 (yarn
模式的 10 个节点的集群上。这是我到目前为止所做的。
...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
.map(line -> {
// processed here
return result;
});
processedLines.foreach(line -> {
// Send to server
});
这有效,但 foreach
循环似乎是顺序的,它似乎不是 运行 在工作节点上的分布式模式下。我说的对吗?
我尝试了以下代码,但它不起作用。错误:java: incompatible types: inferred type does not conform to upper bound(s)
。显然这是错误的,因为 map
是一个转换,而不是一个动作。
lines.map(line -> { /* processing */ })
.map(line -> { /* Send to server */ });
我也试过 take()
,但它需要 int
并且 processedLines.count()
是 long
.
processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });
数据量很大(大于100gb)。我想要的是处理和将其发送到服务器都应该在工作节点上完成。 map
中的处理部分挑衅地发生在工作节点上。但是我如何将处理后的数据从工作节点发送到服务器,因为 foreach
似乎是驱动程序中发生的顺序循环(如果我是正确的)。简单的说,如何在worker节点执行action
而不是在driver程序中执行
非常感谢任何帮助。
foreach
是spark中的一个动作。它基本上采用 RDD 的每个元素并将函数应用于该元素。
foreach
在executor节点或worker节点上执行。它不会 应用于驱动程序节点。请注意,在 运行 spark 的本地执行模式中,驱动程序和执行程序节点可以驻留在同一 JVM 上。
检查此以供参考foreach explanation
您尝试映射 RDD 的每个元素然后将 foreach
应用于每个元素的方法看起来不错。我能想到为什么要花时间的原因是你正在处理的数据大小(~100GB)。
对此进行优化的一种方法是 repartition
输入数据集。理想情况下,每个分区的大小应为 128MB,以获得更好的性能结果。您会找到许多关于对数据进行重新分区的最佳实践的文章。我建议您遵循它们,它会带来一些性能优势。
您可以想到的第二个优化是分配给每个执行程序节点的内存。它在进行火花调谐时起着非常重要的作用。
你能想到的第三个优化是,批量对服务器进行网络调用。您当前正在为 RDD 的每个元素对服务器进行网络调用。如果您的设计允许您对这些网络调用进行批处理,那么您可以在单个网络调用中发送超过 1 个元素。如果产生的延迟主要是由于这些网络调用,这也可能有帮助。
希望对您有所帮助。
首先,当您的代码 运行 在 Executors 上时,它现在已经处于分布式模式,当您想要利用 Executors 上的所有 CPU 资源以获得更多并行性时,您应该选择一些 async
选项,最好使用批处理模式操作,以避免如下所示过度创建客户端连接对象。
您可以将代码替换为
processedLines.foreach(line -> {
使用任一解决方案
processedLines.foreachAsync(line -> {
// Send to server
}).get();
//To iterate batch wise I would go for this
processedLines.foreachPartitionAsync(lineIterator -> {
// Create your ouput client connection here
while (lineIterator.hasNext()){
String line = lineIterator.next();
}
}).get();
这两个函数都将创建一个 Future 对象或提交一个新线程或一个解除阻塞的调用,这将自动为您的代码添加并行性。