我们是否应该像训练前并行化 Seq 那样并行化 DataFrame?

Should we parallelize a DataFrame like we parallelize a Seq before training

考虑此处给出的代码,

https://spark.apache.org/docs/1.2.0/ml-guide.html

import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)

val model1 = lr.fit(training)

假设我们使用 sqlContext.read() 将 "training" 读取为数据帧,应该 我们仍然会做类似

的事情
val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this

或者 fit 函数将在传递数据帧时自动处理计算/数据的并行化

此致,

DataFrame是分布式数据结构。 parallelize 它既不需要也不可能。 SparkConext.parallelize 方法仅用于驻留在驱动程序内存中的分布式本地数据结构。您不应该习惯于分发大型数据集,更不用说重新分发 RDDs 或更高级别的数据结构(就像您在上一个问题中所做的那样)

sc.parallelize(trainingData.collect()) 

如果您想在 RDD / Dataframe (Dataset) 之间进行转换,请使用专门用于此目的的方法:

  1. DataFrameRDD:

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val df: DataFrame  = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
    val rdd: RDD[Row] = df.rdd
    
  2. RDDDataFrame:

    val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
    val df1: DataFrame = rdd.toDF
    // or
    val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
    

您也许应该检查一下 RDD 和 DataFrame 之间的区别以及如何在两者之间进行转换:

直接回答您的问题:DataFrame 已经针对并行执行进行了优化。您无需执行任何操作,您可以将其直接传递给任何火花估算器 fit() 方法。并行执行在后台处理。