如何计算残差的相关性? Spark-Scala

How can I calculate the correlation of my residuals ? Spark-Scala

我需要知道我的残差是否相关。我没有找到在 Databricks 上使用 Spark-Scala 的方法。 我得出结论,我应该将我的项目导出到 R 以使用 acf function

有人知道在 Databricks 上使用 Spark-Scala 的技巧吗?

对于那些需要更多信息的人:我目前正在研究销售预测。我使用了具有不同特征的回归森林。然后,我需要评估我的预测质量。为了检查这一点,我在 paper 上读到残差是查看您的预测模型好坏的好方法。在任何情况下,您仍然可以改进它,但这只是对我的预测模型发表我的意见并将其与其他模型进行比较。

目前,我有一个如下所示的数据框。这是测试 data/out-of-sample 数据的一部分。 (我将预测和残差转换为 IntegerType,这就是为什么在第 3 行 40 - 17 = 22

我正在使用 Spark 2.1.1.

您可以使用 spark ml library function

找到列之间的相关性

让我们先导入 类.

import org.apache.spark.sql.functions.corr
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics

现在准备输入 DataFrame :

scala> val seqRow = Seq(
     |     ("2017-04-27",13,21),
     |     ("2017-04-26",7,16),
     |     ("2017-04-25",40,17),
     |     ("2017-04-24",17,17),
     |     ("2017-04-21",10,20),
     |     ("2017-04-20",9,19),
     |     ("2017-04-19",30,30),
     |     ("2017-04-18",18,25),
     |     ("2017-04-14",32,28),
     |     ("2017-04-13",39,18),
     |     ("2017-04-12",2,4),
     |     ("2017-04-11",8,24),
     |     ("2017-04-10",18,27),
     |     ("2017-04-07",6,17),
     |     ("2017-04-06",13,29),
     |     ("2017-04-05",10,17),
     |     ("2017-04-04",6,8),
     |     ("2017-04-03",20,32)
     | )
seqRow: Seq[(String, Int, Int)] = List((2017-04-27,13,21), (2017-04-26,7,16), (2017-04-25,40,17), (2017-04-24,17,17), (2017-04-21,10,20), (2017-04-20,9,19), (2017-04-19,30,30), (2017-04-18,18,25), (2017-04-14,32,28), (2017-04-13,39,18), (2017-04-12,2,4), (2017-04-11,8,24), (2017-04-10,18,27), (2017-04-07,6,17), (2017-04-06,13,29), (2017-04-05,10,17), (2017-04-04,6,8), (2017-04-03,20,32))

scala> val rdd = sc.parallelize(seqRow)
rdd: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:34

scala> val input_df = spark.createDataFrame(rdd).toDF("date","amount","prediction").withColumn("residuals",'amount - 'prediction)
input_df: org.apache.spark.sql.DataFrame = [date: string, amount: int ... 2 more fields]

scala> input_df.show(false)
+----------+------+----------+---------+
|date      |amount|prediction|residuals|
+----------+------+----------+---------+
|2017-04-27|13    |21        |-8       |
|2017-04-26|7     |16        |-9       |
|2017-04-25|40    |17        |23       |
|2017-04-24|17    |17        |0        |
|2017-04-21|10    |20        |-10      |
|2017-04-20|9     |19        |-10      |
|2017-04-19|30    |30        |0        |
|2017-04-18|18    |25        |-7       |
|2017-04-14|32    |28        |4        |
|2017-04-13|39    |18        |21       |
|2017-04-12|2     |4         |-2       |
|2017-04-11|8     |24        |-16      |
|2017-04-10|18    |27        |-9       |
|2017-04-07|6     |17        |-11      |
|2017-04-06|13    |29        |-16      |
|2017-04-05|10    |17        |-7       |
|2017-04-04|6     |8         |-2       |
|2017-04-03|20    |32        |-12      |
+----------+------+----------+---------+

2017-04-142017-04-13residuals 的值不匹配,因为我正在为 residuals

减去 amount - prediction

现在继续计算所有列之间的相关性。 如果列数较多并且您需要每列与其他列之间的相关性,则此方法用于计算相关性。

首先我们去掉不需要计算相关性的列

scala> val drop_date_df = input_df.drop('date)
drop_date_df: org.apache.spark.sql.DataFrame = [amount: int, prediction: int ... 1 more field]

scala> drop_date_df.show
+------+----------+---------+
|amount|prediction|residuals|
+------+----------+---------+
|    13|        21|       -8|
|     7|        16|       -9|
|    40|        17|       23|
|    17|        17|        0|
|    10|        20|      -10|
|     9|        19|      -10|
|    30|        30|        0|
|    18|        25|       -7|
|    32|        28|        4|
|    39|        18|       21|
|     2|         4|       -2|
|     8|        24|      -16|
|    18|        27|       -9|
|     6|        17|      -11|
|    13|        29|      -16|
|    10|        17|       -7|
|     6|         8|       -2|
|    20|        32|      -12|
+------+----------+---------+

由于关联的列超过2列,我们需要找到关联矩阵。 为了计算 相关矩阵 我们需要 RDD[Vector] 正如你在相关的 spark 示例中看到的那样。

scala> val dense_rdd = drop_date_df.rdd.map{row =>
     |         val first = row.getAs[Integer]("amount")
     |         val second = row.getAs[Integer]("prediction")
     |         val third = row.getAs[Integer]("residuals")
     |         Vectors.dense(first.toDouble,second.toDouble,third.toDouble)}
dense_rdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[62] at map at <console>:40

scala> val correlMatrix: Matrix = Statistics.corr(dense_rdd, "pearson")
correlMatrix: org.apache.spark.mllib.linalg.Matrix =
1.0                  0.40467032516705076  0.782939330961529
0.40467032516705076  1.0                  -0.2520531290688281
0.782939330961529    -0.2520531290688281  1.0

列的顺序保持不变,但您去掉了列名。 您可以找到有关相关矩阵结构的好资源。

因为你想找到残差与其他两列的相关性。 我们可以探索其他选择

蜂巢 corr UDAF

scala> drop_date_df.createOrReplaceTempView("temp_table")

scala> val corr_query_df = spark.sql("select corr(amount,residuals) as amount_residuals_corr,corr(prediction,residuals) as prediction_residual_corr from temp_table")
corr_query_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]

scala> corr_query_df.show
+---------------------+------------------------+
|amount_residuals_corr|prediction_residual_corr|
+---------------------+------------------------+
|   0.7829393309615287|      -0.252053129068828|
+---------------------+------------------------+

Spark corr 函数 link

scala> val corr_df = drop_date_df.select(
     |                 corr('amount,'residuals).as("amount_residuals_corr"),
     |                 corr('prediction,'residuals).as("prediction_residual_corr"))
corr_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]

scala> corr_df.show
+---------------------+------------------------+
|amount_residuals_corr|prediction_residual_corr|
+---------------------+------------------------+
|   0.7829393309615287|      -0.252053129068828|
+---------------------+------------------------+