如何计算残差的相关性? Spark-Scala
How can I calculate the correlation of my residuals ? Spark-Scala
我需要知道我的残差是否相关。我没有找到在 Databricks 上使用 Spark-Scala 的方法。
我得出结论,我应该将我的项目导出到 R 以使用 acf function。
有人知道在 Databricks 上使用 Spark-Scala 的技巧吗?
对于那些需要更多信息的人:我目前正在研究销售预测。我使用了具有不同特征的回归森林。然后,我需要评估我的预测质量。为了检查这一点,我在 paper 上读到残差是查看您的预测模型好坏的好方法。在任何情况下,您仍然可以改进它,但这只是对我的预测模型发表我的意见并将其与其他模型进行比较。
目前,我有一个如下所示的数据框。这是测试 data/out-of-sample 数据的一部分。 (我将预测和残差转换为 IntegerType,这就是为什么在第 3 行 40 - 17 = 22)
我正在使用 Spark 2.1.1
.
您可以使用 spark ml library function
找到列之间的相关性
让我们先导入 类.
import org.apache.spark.sql.functions.corr
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics
现在准备输入 DataFrame :
scala> val seqRow = Seq(
| ("2017-04-27",13,21),
| ("2017-04-26",7,16),
| ("2017-04-25",40,17),
| ("2017-04-24",17,17),
| ("2017-04-21",10,20),
| ("2017-04-20",9,19),
| ("2017-04-19",30,30),
| ("2017-04-18",18,25),
| ("2017-04-14",32,28),
| ("2017-04-13",39,18),
| ("2017-04-12",2,4),
| ("2017-04-11",8,24),
| ("2017-04-10",18,27),
| ("2017-04-07",6,17),
| ("2017-04-06",13,29),
| ("2017-04-05",10,17),
| ("2017-04-04",6,8),
| ("2017-04-03",20,32)
| )
seqRow: Seq[(String, Int, Int)] = List((2017-04-27,13,21), (2017-04-26,7,16), (2017-04-25,40,17), (2017-04-24,17,17), (2017-04-21,10,20), (2017-04-20,9,19), (2017-04-19,30,30), (2017-04-18,18,25), (2017-04-14,32,28), (2017-04-13,39,18), (2017-04-12,2,4), (2017-04-11,8,24), (2017-04-10,18,27), (2017-04-07,6,17), (2017-04-06,13,29), (2017-04-05,10,17), (2017-04-04,6,8), (2017-04-03,20,32))
scala> val rdd = sc.parallelize(seqRow)
rdd: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:34
scala> val input_df = spark.createDataFrame(rdd).toDF("date","amount","prediction").withColumn("residuals",'amount - 'prediction)
input_df: org.apache.spark.sql.DataFrame = [date: string, amount: int ... 2 more fields]
scala> input_df.show(false)
+----------+------+----------+---------+
|date |amount|prediction|residuals|
+----------+------+----------+---------+
|2017-04-27|13 |21 |-8 |
|2017-04-26|7 |16 |-9 |
|2017-04-25|40 |17 |23 |
|2017-04-24|17 |17 |0 |
|2017-04-21|10 |20 |-10 |
|2017-04-20|9 |19 |-10 |
|2017-04-19|30 |30 |0 |
|2017-04-18|18 |25 |-7 |
|2017-04-14|32 |28 |4 |
|2017-04-13|39 |18 |21 |
|2017-04-12|2 |4 |-2 |
|2017-04-11|8 |24 |-16 |
|2017-04-10|18 |27 |-9 |
|2017-04-07|6 |17 |-11 |
|2017-04-06|13 |29 |-16 |
|2017-04-05|10 |17 |-7 |
|2017-04-04|6 |8 |-2 |
|2017-04-03|20 |32 |-12 |
+----------+------+----------+---------+
行 2017-04-14
和 2017-04-13
的 residuals
的值不匹配,因为我正在为 residuals
减去 amount - prediction
现在继续计算所有列之间的相关性。
如果列数较多并且您需要每列与其他列之间的相关性,则此方法用于计算相关性。
首先我们去掉不需要计算相关性的列
scala> val drop_date_df = input_df.drop('date)
drop_date_df: org.apache.spark.sql.DataFrame = [amount: int, prediction: int ... 1 more field]
scala> drop_date_df.show
+------+----------+---------+
|amount|prediction|residuals|
+------+----------+---------+
| 13| 21| -8|
| 7| 16| -9|
| 40| 17| 23|
| 17| 17| 0|
| 10| 20| -10|
| 9| 19| -10|
| 30| 30| 0|
| 18| 25| -7|
| 32| 28| 4|
| 39| 18| 21|
| 2| 4| -2|
| 8| 24| -16|
| 18| 27| -9|
| 6| 17| -11|
| 13| 29| -16|
| 10| 17| -7|
| 6| 8| -2|
| 20| 32| -12|
+------+----------+---------+
由于关联的列超过2列,我们需要找到关联矩阵。
为了计算 相关矩阵 我们需要 RDD[Vector] 正如你在相关的 spark 示例中看到的那样。
scala> val dense_rdd = drop_date_df.rdd.map{row =>
| val first = row.getAs[Integer]("amount")
| val second = row.getAs[Integer]("prediction")
| val third = row.getAs[Integer]("residuals")
| Vectors.dense(first.toDouble,second.toDouble,third.toDouble)}
dense_rdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[62] at map at <console>:40
scala> val correlMatrix: Matrix = Statistics.corr(dense_rdd, "pearson")
correlMatrix: org.apache.spark.mllib.linalg.Matrix =
1.0 0.40467032516705076 0.782939330961529
0.40467032516705076 1.0 -0.2520531290688281
0.782939330961529 -0.2520531290688281 1.0
列的顺序保持不变,但您去掉了列名。
您可以找到有关相关矩阵结构的好资源。
因为你想找到残差与其他两列的相关性。
我们可以探索其他选择
蜂巢 corr UDAF
scala> drop_date_df.createOrReplaceTempView("temp_table")
scala> val corr_query_df = spark.sql("select corr(amount,residuals) as amount_residuals_corr,corr(prediction,residuals) as prediction_residual_corr from temp_table")
corr_query_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]
scala> corr_query_df.show
+---------------------+------------------------+
|amount_residuals_corr|prediction_residual_corr|
+---------------------+------------------------+
| 0.7829393309615287| -0.252053129068828|
+---------------------+------------------------+
Spark corr 函数 link
scala> val corr_df = drop_date_df.select(
| corr('amount,'residuals).as("amount_residuals_corr"),
| corr('prediction,'residuals).as("prediction_residual_corr"))
corr_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]
scala> corr_df.show
+---------------------+------------------------+
|amount_residuals_corr|prediction_residual_corr|
+---------------------+------------------------+
| 0.7829393309615287| -0.252053129068828|
+---------------------+------------------------+
我需要知道我的残差是否相关。我没有找到在 Databricks 上使用 Spark-Scala 的方法。 我得出结论,我应该将我的项目导出到 R 以使用 acf function。
有人知道在 Databricks 上使用 Spark-Scala 的技巧吗?
对于那些需要更多信息的人:我目前正在研究销售预测。我使用了具有不同特征的回归森林。然后,我需要评估我的预测质量。为了检查这一点,我在 paper 上读到残差是查看您的预测模型好坏的好方法。在任何情况下,您仍然可以改进它,但这只是对我的预测模型发表我的意见并将其与其他模型进行比较。
目前,我有一个如下所示的数据框。这是测试 data/out-of-sample 数据的一部分。 (我将预测和残差转换为 IntegerType,这就是为什么在第 3 行 40 - 17 = 22)
我正在使用 Spark 2.1.1
.
您可以使用 spark ml library function
找到列之间的相关性让我们先导入 类.
import org.apache.spark.sql.functions.corr
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics
现在准备输入 DataFrame :
scala> val seqRow = Seq(
| ("2017-04-27",13,21),
| ("2017-04-26",7,16),
| ("2017-04-25",40,17),
| ("2017-04-24",17,17),
| ("2017-04-21",10,20),
| ("2017-04-20",9,19),
| ("2017-04-19",30,30),
| ("2017-04-18",18,25),
| ("2017-04-14",32,28),
| ("2017-04-13",39,18),
| ("2017-04-12",2,4),
| ("2017-04-11",8,24),
| ("2017-04-10",18,27),
| ("2017-04-07",6,17),
| ("2017-04-06",13,29),
| ("2017-04-05",10,17),
| ("2017-04-04",6,8),
| ("2017-04-03",20,32)
| )
seqRow: Seq[(String, Int, Int)] = List((2017-04-27,13,21), (2017-04-26,7,16), (2017-04-25,40,17), (2017-04-24,17,17), (2017-04-21,10,20), (2017-04-20,9,19), (2017-04-19,30,30), (2017-04-18,18,25), (2017-04-14,32,28), (2017-04-13,39,18), (2017-04-12,2,4), (2017-04-11,8,24), (2017-04-10,18,27), (2017-04-07,6,17), (2017-04-06,13,29), (2017-04-05,10,17), (2017-04-04,6,8), (2017-04-03,20,32))
scala> val rdd = sc.parallelize(seqRow)
rdd: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:34
scala> val input_df = spark.createDataFrame(rdd).toDF("date","amount","prediction").withColumn("residuals",'amount - 'prediction)
input_df: org.apache.spark.sql.DataFrame = [date: string, amount: int ... 2 more fields]
scala> input_df.show(false)
+----------+------+----------+---------+
|date |amount|prediction|residuals|
+----------+------+----------+---------+
|2017-04-27|13 |21 |-8 |
|2017-04-26|7 |16 |-9 |
|2017-04-25|40 |17 |23 |
|2017-04-24|17 |17 |0 |
|2017-04-21|10 |20 |-10 |
|2017-04-20|9 |19 |-10 |
|2017-04-19|30 |30 |0 |
|2017-04-18|18 |25 |-7 |
|2017-04-14|32 |28 |4 |
|2017-04-13|39 |18 |21 |
|2017-04-12|2 |4 |-2 |
|2017-04-11|8 |24 |-16 |
|2017-04-10|18 |27 |-9 |
|2017-04-07|6 |17 |-11 |
|2017-04-06|13 |29 |-16 |
|2017-04-05|10 |17 |-7 |
|2017-04-04|6 |8 |-2 |
|2017-04-03|20 |32 |-12 |
+----------+------+----------+---------+
行 2017-04-14
和 2017-04-13
的 residuals
的值不匹配,因为我正在为 residuals
amount - prediction
现在继续计算所有列之间的相关性。 如果列数较多并且您需要每列与其他列之间的相关性,则此方法用于计算相关性。
首先我们去掉不需要计算相关性的列
scala> val drop_date_df = input_df.drop('date)
drop_date_df: org.apache.spark.sql.DataFrame = [amount: int, prediction: int ... 1 more field]
scala> drop_date_df.show
+------+----------+---------+
|amount|prediction|residuals|
+------+----------+---------+
| 13| 21| -8|
| 7| 16| -9|
| 40| 17| 23|
| 17| 17| 0|
| 10| 20| -10|
| 9| 19| -10|
| 30| 30| 0|
| 18| 25| -7|
| 32| 28| 4|
| 39| 18| 21|
| 2| 4| -2|
| 8| 24| -16|
| 18| 27| -9|
| 6| 17| -11|
| 13| 29| -16|
| 10| 17| -7|
| 6| 8| -2|
| 20| 32| -12|
+------+----------+---------+
由于关联的列超过2列,我们需要找到关联矩阵。 为了计算 相关矩阵 我们需要 RDD[Vector] 正如你在相关的 spark 示例中看到的那样。
scala> val dense_rdd = drop_date_df.rdd.map{row =>
| val first = row.getAs[Integer]("amount")
| val second = row.getAs[Integer]("prediction")
| val third = row.getAs[Integer]("residuals")
| Vectors.dense(first.toDouble,second.toDouble,third.toDouble)}
dense_rdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[62] at map at <console>:40
scala> val correlMatrix: Matrix = Statistics.corr(dense_rdd, "pearson")
correlMatrix: org.apache.spark.mllib.linalg.Matrix =
1.0 0.40467032516705076 0.782939330961529
0.40467032516705076 1.0 -0.2520531290688281
0.782939330961529 -0.2520531290688281 1.0
列的顺序保持不变,但您去掉了列名。 您可以找到有关相关矩阵结构的好资源。
因为你想找到残差与其他两列的相关性。 我们可以探索其他选择
蜂巢 corr UDAF
scala> drop_date_df.createOrReplaceTempView("temp_table")
scala> val corr_query_df = spark.sql("select corr(amount,residuals) as amount_residuals_corr,corr(prediction,residuals) as prediction_residual_corr from temp_table")
corr_query_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]
scala> corr_query_df.show
+---------------------+------------------------+
|amount_residuals_corr|prediction_residual_corr|
+---------------------+------------------------+
| 0.7829393309615287| -0.252053129068828|
+---------------------+------------------------+
Spark corr 函数 link
scala> val corr_df = drop_date_df.select(
| corr('amount,'residuals).as("amount_residuals_corr"),
| corr('prediction,'residuals).as("prediction_residual_corr"))
corr_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]
scala> corr_df.show
+---------------------+------------------------+
|amount_residuals_corr|prediction_residual_corr|
+---------------------+------------------------+
| 0.7829393309615287| -0.252053129068828|
+---------------------+------------------------+