读取多个文件并收集是否将它们带到火花中的驱动程序

Does reading multiple files & collect bring them to driver in spark

代码片段:

val inp = sc.textFile("C:\mk\logdir\foldera\foldera1\log.txt").collect.mkString(" ")

我知道上面的代码读取整个文件并将它们组合成一个字符串并执行驱动程序节点(单次执行。不是并行执行)。

 val inp = sc.textFile("C:\mk\logdir\*\*\log.txt")
 code block{ }
 sc.stop

Q1 )我正在读取多个文件(存在于上述文件夹结构中)。我相信在这种情况下,每个文件都将被创建为分区,并将被发送到单独的节点并并行执行。我的理解正确吗?有人可以证实这一点吗?或者有什么可以系统地确认的吗?

val inp = sc.textFile("C:\mk\logdir\*\*\log.txt")
val cont = inp.collect.mkString(" ")
 code block{ }
 sc.stop

Q2) spark 如何处理这种情况。虽然我正在收集,但我认为它不会收集所有文件的所有内容,而只会收集一个文件。我对吗?有人可以帮助我理解这一点吗?

非常感谢您的宝贵时间和帮助。

Spark 是一种用于大规模数据处理的快速通用引擎。它并行处理所有数据。因此,要回答第一个问题,然后,在以下情况下:

val inp = sc.textFile("C:\mk\logdir\*\*\log.txt")
code block{ }
sc.stop

每个文件将被创建为分区,并将被发送到单独的节点并并行执行。但是,根据文件的大小,分区数可能大于正在处理的文件数。例如,如果 folder1folder2 中的 log.txt 的大小只有几 KB,那么只会创建 2 个分区,因为会有 2 个文件,它们将被并行处理。

但是,如果 folder1 中的 log.txt 的大小以 GB 为单位,则会为其创建多个分区,并且分区数将大于文件数。

但是,我们总是可以使用 repartition()coalesce() 方法更改 RDD 的分区数。

要回答第二个问题,那么在以下情况下:

val inp = sc.textFile("C:\mk\logdir\*\*\log.txt")
val cont = inp.collect.mkString(" ")
code block{ }
sc.stop

Spark 将从所有文件中收集内容,而不仅仅是从一个文件中收集内容。因为,collect()的意思是获取存储在一个RDD中的所有内容,并以集合的形式返回给Driver。

Q1 )Here I am reading multiple files (which are present in above folder structure). I believe in this case each file will be created as partition & will be sent to separate node & executed parallely. Am I correct in my understanding? Can someone confirm this? Or is there anyway i can confirm it systematically?

答案:

SparkContext 的 TextFile 方法,即 sc.textFile 创建一个 RDD,每行作为一个元素。如果数据中有 10 个文件,即 yourtextfilesfolder 文件夹,将创建 10 个分区。您可以通过以下方式验证分区数:

yourtextfilesfolder.partitions.length

但是,分区是由数据局部性决定的。这可能会导致默认情况下分区太少。据我所知,不能保证会创建一个分区,请参阅“SparkContext.textFile”的代码。

& 'minPartitions' - 生成的 RDD

建议的最小分区数

为了更好地理解,请参阅下面的方法。

/**
           * Read a text file from HDFS, a local file system (available on all nodes), or any
           * Hadoop-supported file system URI, and return it as an RDD of Strings.
           */
          def textFile(
              path: String,
              minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
            assertNotStopped()
            hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
              minPartitions).map(pair => pair._2.toString).setName(path)
          }

you can mention minPartitions as shown above from SparkContext.scala

Q2) How the spark handles this case. though I am doing collect, I assume that it will not collect all content from all files but just the one file . Am I right? Can someone help me understand this?

答案:您的 rdd 由多个文本文件构成。所以 collect 将从所有分区收集到来自不同文件的驱动程序,而不是一次一个文件。

您可以验证:使用 rdd.collect


但是,如果你想读取多个文本文件,你也可以使用 wholeTextFiles 请看下面方法中的@note 小文件是首选,大文件也是允许的,但可能会导致性能不佳。

spark-core-sc-textfile-vs-sc-wholetextfiles

文档:

RDD> wholeTextFiles(String path, int minPartitions) Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI.

/**
   * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI. Each file is read as a single record and returned in a
   * key-value pair, where the key is the path of each file, the value is the content of each file.
   *
   * <p> For example, if you have the following files:
   * {{{
   *   hdfs://a-hdfs-path/part-00000
   *   hdfs://a-hdfs-path/part-00001
   *   ...
   *   hdfs://a-hdfs-path/part-nnnnn
   * }}}
   *
   * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
   *
   * <p> then `rdd` contains
   * {{{
   *   (a-hdfs-path/part-00000, its content)
   *   (a-hdfs-path/part-00001, its content)
   *   ...
   *   (a-hdfs-path/part-nnnnn, its content)
   * }}}
   *
   * @note Small files are preferred, large file is also allowable, but may cause bad performance.
   * @note On some filesystems, `.../path/&#42;` can be a more efficient way to read all files
   *       in a directory rather than `.../path/` or `.../path`
   * @note Partitioning is determined by data locality. This may result in too few partitions
   *       by default.
   *
   * @param path Directory to the input data files, the path can be comma separated paths as the
   *             list of inputs.
   * @param minPartitions A suggestion value of the minimal splitting number for input data.
   * @return RDD representing tuples of file path and the corresponding file content
   */
  def wholeTextFiles(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
.....
  }

示例:

val distFile = sc.textFile("data.txt")
Above command returns the content of the file:
scala> distFile.collect()
res16: Array[String] = Array(1,2,3, 4,5,6)


 SparkContext.wholeTextFiles can return (filename, content).
    val distFile = sc.wholeTextFiles("/tmp/tmpdir")

scala> distFile.collect()
res17: Array[(String, String)] =
Array((maprfs:/tmp/tmpdir/data3.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data2.txt,"1,2,3
4,5,6
"))

在你的情况下,我更喜欢 SparkContext.wholeTextFiles 如果你想要的话,你可以在如上所述收集后获取文件名及其内容。