使用pyspark识别大量数据的变化
Identifying changes in large amounts of data using pyspark
我有一个 DATE
列和一个 RESULT
列的大量数据(大约十亿行)。
RESULT
列中的值主要是名称,但有时值会出现显着偏差。我只想确定偏差较大的日期。
所以从这样的输入数据帧:
+----------+------+
| DATE|RESULT|
+----------+------+
|2020-06-24| 4.2|
|2020-05-17| 4.5|
|2020-05-11| 4.5|
|2020-07-30| 4.2|
|2020-07-30| 4.2|
|2020-06-29| 4.2|
|2020-06-29| 4.2|
|2020-03-04| 4.5|
|2020-06-01| 4.2|
|2020-06-27| 4.2|
|2020-06-29| 4.2|
|2020-06-29| 4.2|
|2020-04-17| 4.5|
|2020-04-17| 4.5|
|2020-01-04| 4.5|
|2020-02-29| 4.5|
|2020-07-07| 4.2|
|2020-05-07| 4.5|
|2020-06-09| 4.2|
|2020-06-22| 4.2|
+----------+------+
我希望输出:
+----------+------+
| DATE|RESULT|
+----------+------+
|2020-05-11| 4.5|
|2020-07-30| 4.2|
|2020-06-29| 4.2|
|2020-04-17| 4.5|
|2020-02-29| 4.5|
|2020-07-07| 4.2|
|2020-05-07| 4.5|
|2020-06-09| 4.2|
+----------+------+
我尝试使用window和滞后函数,但它迫使整个数据集成为一个节点,因此失去了使用分布式计算的优势。
我在 Whosebug 中遇到了使用中值和平均绝对偏差 (MAD) 并定义阈值来识别异常偏移记录的建议,但我在 pyspark.sql.functions 库中找不到 MAD 统计函数。
有没有人有更好的主意?我将不胜感激。
我在 pyspark 中编码,但如果解决方案在 spark/scala 中,那也很好。
谢谢
您可能会发现此 link 对计算 MAD https://www.advancinganalytics.co.uk/blog/2020/9/2/identifying-outliers-in-spark-30
很有用
从下面的 link 添加相关内容:
MAD=中位数(|xi-xm|)
其中 xm 是数据集的中值,xi 是数据集中的值。
MAD是每个值与整个数据集的中位数之差的中位数
考虑一个包含列 'category'、'data_col'
的 df
'percentile()' 需要计算一个列和一个百分位数数组(对于中位数,我们可以提供 'array(0.5)',因为 50% 的值是中位数)并且 return 一个结果数组.
MADdf = df.groupby('category')\
.agg(F.expr('percentile(data_col, array(0.5))')[0]\
.alias('data_col_median'))\
.join(df, "category", "left")\
.withColumn("data_col_difference_median", F.abs(F.col('data_col')-F.col('data_col_median')))\
.groupby('category', 'data_col_median')\
.agg(F.expr('percentile(data_col_difference_median, array(0.5))')[0]\
.alias('median_absolute_difference'))
我有一个 DATE
列和一个 RESULT
列的大量数据(大约十亿行)。
RESULT
列中的值主要是名称,但有时值会出现显着偏差。我只想确定偏差较大的日期。
所以从这样的输入数据帧:
+----------+------+
| DATE|RESULT|
+----------+------+
|2020-06-24| 4.2|
|2020-05-17| 4.5|
|2020-05-11| 4.5|
|2020-07-30| 4.2|
|2020-07-30| 4.2|
|2020-06-29| 4.2|
|2020-06-29| 4.2|
|2020-03-04| 4.5|
|2020-06-01| 4.2|
|2020-06-27| 4.2|
|2020-06-29| 4.2|
|2020-06-29| 4.2|
|2020-04-17| 4.5|
|2020-04-17| 4.5|
|2020-01-04| 4.5|
|2020-02-29| 4.5|
|2020-07-07| 4.2|
|2020-05-07| 4.5|
|2020-06-09| 4.2|
|2020-06-22| 4.2|
+----------+------+
我希望输出:
+----------+------+
| DATE|RESULT|
+----------+------+
|2020-05-11| 4.5|
|2020-07-30| 4.2|
|2020-06-29| 4.2|
|2020-04-17| 4.5|
|2020-02-29| 4.5|
|2020-07-07| 4.2|
|2020-05-07| 4.5|
|2020-06-09| 4.2|
+----------+------+
我尝试使用window和滞后函数,但它迫使整个数据集成为一个节点,因此失去了使用分布式计算的优势。 我在 Whosebug 中遇到了使用中值和平均绝对偏差 (MAD) 并定义阈值来识别异常偏移记录的建议,但我在 pyspark.sql.functions 库中找不到 MAD 统计函数。 有没有人有更好的主意?我将不胜感激。 我在 pyspark 中编码,但如果解决方案在 spark/scala 中,那也很好。 谢谢
您可能会发现此 link 对计算 MAD https://www.advancinganalytics.co.uk/blog/2020/9/2/identifying-outliers-in-spark-30
很有用从下面的 link 添加相关内容:
MAD=中位数(|xi-xm|)
其中 xm 是数据集的中值,xi 是数据集中的值。 MAD是每个值与整个数据集的中位数之差的中位数
考虑一个包含列 'category'、'data_col'
的 df'percentile()' 需要计算一个列和一个百分位数数组(对于中位数,我们可以提供 'array(0.5)',因为 50% 的值是中位数)并且 return 一个结果数组.
MADdf = df.groupby('category')\
.agg(F.expr('percentile(data_col, array(0.5))')[0]\
.alias('data_col_median'))\
.join(df, "category", "left")\
.withColumn("data_col_difference_median", F.abs(F.col('data_col')-F.col('data_col_median')))\
.groupby('category', 'data_col_median')\
.agg(F.expr('percentile(data_col_difference_median, array(0.5))')[0]\
.alias('median_absolute_difference'))