我们是否应该像训练前并行化 Seq 那样并行化 DataFrame?
Should we parallelize a DataFrame like we parallelize a Seq before training
考虑此处给出的代码,
https://spark.apache.org/docs/1.2.0/ml-guide.html
import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))
val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)
val model1 = lr.fit(training)
假设我们使用 sqlContext.read() 将 "training" 读取为数据帧,应该
我们仍然会做类似
的事情
val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this
或者 fit 函数将在传递数据帧时自动处理计算/数据的并行化
此致,
DataFrame
是分布式数据结构。 parallelize
它既不需要也不可能。 SparkConext.parallelize
方法仅用于驻留在驱动程序内存中的分布式本地数据结构。您不应该习惯于分发大型数据集,更不用说重新分发 RDDs
或更高级别的数据结构(就像您在上一个问题中所做的那样)
sc.parallelize(trainingData.collect())
如果您想在 RDD
/ Dataframe
(Dataset
) 之间进行转换,请使用专门用于此目的的方法:
从 DataFrame
到 RDD
:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val df: DataFrame = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
val rdd: RDD[Row] = df.rdd
从 RDD
到 DataFrame
:
val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
val df1: DataFrame = rdd.toDF
// or
val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
您也许应该检查一下 RDD 和 DataFrame 之间的区别以及如何在两者之间进行转换:
直接回答您的问题:DataFrame 已经针对并行执行进行了优化。您无需执行任何操作,您可以将其直接传递给任何火花估算器 fit() 方法。并行执行在后台处理。
考虑此处给出的代码,
https://spark.apache.org/docs/1.2.0/ml-guide.html
import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))
val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)
val model1 = lr.fit(training)
假设我们使用 sqlContext.read() 将 "training" 读取为数据帧,应该 我们仍然会做类似
的事情val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this
或者 fit 函数将在传递数据帧时自动处理计算/数据的并行化
此致,
DataFrame
是分布式数据结构。 parallelize
它既不需要也不可能。 SparkConext.parallelize
方法仅用于驻留在驱动程序内存中的分布式本地数据结构。您不应该习惯于分发大型数据集,更不用说重新分发 RDDs
或更高级别的数据结构(就像您在上一个问题中所做的那样)
sc.parallelize(trainingData.collect())
如果您想在 RDD
/ Dataframe
(Dataset
) 之间进行转换,请使用专门用于此目的的方法:
从
DataFrame
到RDD
:import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD val df: DataFrame = Seq(("foo", 1), ("bar", 2)).toDF("k", "v") val rdd: RDD[Row] = df.rdd
从
RDD
到DataFrame
:val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2))) val df1: DataFrame = rdd.toDF // or val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
您也许应该检查一下 RDD 和 DataFrame 之间的区别以及如何在两者之间进行转换:
直接回答您的问题:DataFrame 已经针对并行执行进行了优化。您无需执行任何操作,您可以将其直接传递给任何火花估算器 fit() 方法。并行执行在后台处理。