Apache Spark 以全分布式模式对执行器执行操作

Apache Spark take Action on Executors in fully distributed mode

我是 spark 的新手,我对转换和操作的工作原理有基本的了解 ()。我正在尝试对文本文件中的每一行(基本上是段落)进行一些 NLP 操作。处理后,结果应发送到服务器(REST Api)进行存储。该程序 运行 作为 spark 作业(使用 spark-submit 提交)在 yarn 模式的 10 个节点的集群上。这是我到目前为止所做的。

...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
    .map(line -> {
        // processed here
        return result;
    });
processedLines.foreach(line -> {
    // Send to server
});

这有效,但 foreach 循环似乎是顺序的,它似乎不是 运行 在工作节点上的分布式模式下。我说的对吗?

我尝试了以下代码,但它不起作用。错误:java: incompatible types: inferred type does not conform to upper bound(s)。显然这是错误的,因为 map 是一个转换,而不是一个动作。

lines.map(line -> { /* processing */ })
     .map(line -> { /* Send to server */ });

我也试过 take(),但它需要 int 并且 processedLines.count()long.

类型
processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });

数据量很大(大于100gb)。我想要的是处理和将其发送到服务器都应该在工作节点上完成。 map 中的处理部分挑衅地发生在工作节点上。但是我如何将处理后的数据从工作节点发送到服务器,因为 foreach 似乎是驱动程序中发生的顺序循环(如果我是正确的)。简单的说,如何在worker节点执行action而不是在driver程序中执行

非常感谢任何帮助。

foreach是spark中的一个动作。它基本上采用 RDD 的每个元素并将函数应用于该元素。

foreach在executor节点或worker节点上执行。它不会 应用于驱动程序节点。请注意,在 运行 spark 的本地执行模式中,驱动程序和执行程序节点可以驻留在同一 JVM 上。

检查此以供参考foreach explanation

您尝试映射 RDD 的每个元素然后将 foreach 应用于每个元素的方法看起来不错。我能想到为什么要花时间的原因是你正在处理的数据大小(~100GB)。

对此进行优化的一种方法是 repartition 输入数据集。理想情况下,每个分区的大小应为 128MB,以获得更好的性能结果。您会找到许多关于对数据进行重新分区的最佳实践的文章。我建议您遵循它们,它会带来一些性能优势。

您可以想到的第二个优化是分配给每个执行程序节点的内存。它在进行火花调谐时起着非常重要的作用。

你能想到的第三个优化是,批量对服务器进行网络调用。您当前正在为 RDD 的每个元素对服务器进行网络调用。如果您的设计允许您对这些网络调用进行批处理,那么您可以在单个网络调用中发送超过 1 个元素。如果产生的延迟主要是由于这些网络调用,这也可能有帮助。

希望对您有所帮助。

首先,当您的代码 运行 在 Executors 上时,它现在已经处于分布式模式,当您想要利用 Executors 上的所有 CPU 资源以获得更多并行性时,您应该选择一些 async 选项,最好使用批处理模式操作,以避免如下所示过度创建客户端连接对象。

您可以将代码替换为

processedLines.foreach(line -> {

使用任一解决方案

processedLines.foreachAsync(line -> {
    // Send to server
}).get();

//To iterate batch wise I would go for this
processedLines.foreachPartitionAsync(lineIterator -> {
// Create your ouput client connection here
    while (lineIterator.hasNext()){
        String line  = lineIterator.next();
    }
}).get();

这两个函数都将创建一个 Future 对象或提交一个新线程或一个解除阻塞的调用,这将自动为您的代码添加并行性。