在 pyspark 中聚合 Kolmogorov Smirnov 测试
Aggregating Kolmogrov Smirnov Test in pyspark
有没有办法使用 groupBy 子句或某种聚合方法从 pyspark 中的 spark.mllib
库应用 KS 测试?
例如,我有一个数据框 df
,其中包含列 ID
和 RESULT
,如下所示:
+-------+------+
| ID|RESULT|
+-------+------+
|3648296| 2.73|
|3648296| 9.64|
|3648189| 0.03|
|3648189| 0.03|
|3648296| 2.51|
|3648189| 0.01|
|3648296| 1.75|
|3648296| 30.23|
|3648189| 0.02|
|3648189| 0.02|
|3648189| 0.02|
|3648296| 3.28|
|3648296| 32.55|
|3648296| 2.32|
|3648296| 34.58|
|3648296| 29.22|
|3648189| 0.02|
|3648296| 1.36|
|3648296| 1.64|
|3648296| 1.17|
+-------+------+
有2个ID
3648296
和3648189
,它们对应的RESULT
值都在几十万左右。
是否可以像这样应用 groupBy 函数:
from pyspark.mllib.stat import Statistics
normtest=df.groupBy('ID').Statistics.kolmogorovSmirnovTest(df.RESULT, "norm", 0, 1)
这样我得到一个输出数据帧:
+-------+---------+----------+
| ID|p-value |statistic |
+-------+---------+----------+
|3648296|some val | some val |
|3648189|some val | some val |
+-------+---------+----------+
这可能吗?
这可以通过对数据进行分箱,然后对分箱数据(即直方图)执行 Kolmogorov-Smirnov Test 来解决。
它不会产生最大的距离,但如果你的有效分布是平滑的,那么结果应该足够接近。
通过对结果进行分桶,您可以确保一次只会将有限数量的项目(分桶数)加载到内存中。
首先,我们需要实现 kstest 的直方图版本:
import numpy as np
def hist_kstest(hist: np.array, bin_edges: np.array, cdf):
i = hist.cumsum()
n = i[-1]
bin_right_edges = bin_edges[1:]
cdf_vals = cdf(bin_right_edges)
statistic = np.max([
cdf_vals - (i-1) / n,
i / n - cdf_vals
])
pvalue = stats.distributions.kstwo.sf(statistic, n)
return statistic, pvalue
然后使用如下:
from pyspark.sql import functions as F, types as T
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np
from scipy import stats
# Choose the number of buckets. It depends on your memory
# availability and affects the accuracy of the test.
num_buckets = 1_000
# Choose the null hypothesis (H0)
h0_cdf = stats.norm(0, 1).cdf
# Bucket the result and get the buckets' edges
bucketizer = QuantileDiscretizer(
numBuckets=num_buckets, inputCol='RESULT', outputCol='result_bucket'
).setHandleInvalid("keep").fit(df)
buckets = np.array(bucketizer.getSplits())
def kstest(key, pdf: pd.DataFrame):
pdf.sort_values('result_bucket', inplace=True)
hist = pdf['count'].to_numpy()
# Some of the buckets might not appear in all the groups, so
# we filter buckets that are not available.
bin_edges = buckets[[0, *(pdf['result_bucket'].to_numpy() + 1)]]
statistic, pvalue = hist_kstest(hist, bin_edges, h0_cdf)
return pd.DataFrame([[*key, statistic, pvalue]])
df = bucketizer.transform(df).groupBy("ID", "result_bucket").agg(
F.count("*").alias("count")
).groupby("ID").applyInPandas(kstest, "ID long, statistic double, pvalue double")
有没有办法使用 groupBy 子句或某种聚合方法从 pyspark 中的 spark.mllib
库应用 KS 测试?
例如,我有一个数据框 df
,其中包含列 ID
和 RESULT
,如下所示:
+-------+------+
| ID|RESULT|
+-------+------+
|3648296| 2.73|
|3648296| 9.64|
|3648189| 0.03|
|3648189| 0.03|
|3648296| 2.51|
|3648189| 0.01|
|3648296| 1.75|
|3648296| 30.23|
|3648189| 0.02|
|3648189| 0.02|
|3648189| 0.02|
|3648296| 3.28|
|3648296| 32.55|
|3648296| 2.32|
|3648296| 34.58|
|3648296| 29.22|
|3648189| 0.02|
|3648296| 1.36|
|3648296| 1.64|
|3648296| 1.17|
+-------+------+
有2个ID
3648296
和3648189
,它们对应的RESULT
值都在几十万左右。
是否可以像这样应用 groupBy 函数:
from pyspark.mllib.stat import Statistics
normtest=df.groupBy('ID').Statistics.kolmogorovSmirnovTest(df.RESULT, "norm", 0, 1)
这样我得到一个输出数据帧:
+-------+---------+----------+
| ID|p-value |statistic |
+-------+---------+----------+
|3648296|some val | some val |
|3648189|some val | some val |
+-------+---------+----------+
这可能吗?
这可以通过对数据进行分箱,然后对分箱数据(即直方图)执行 Kolmogorov-Smirnov Test 来解决。 它不会产生最大的距离,但如果你的有效分布是平滑的,那么结果应该足够接近。
通过对结果进行分桶,您可以确保一次只会将有限数量的项目(分桶数)加载到内存中。
首先,我们需要实现 kstest 的直方图版本:
import numpy as np
def hist_kstest(hist: np.array, bin_edges: np.array, cdf):
i = hist.cumsum()
n = i[-1]
bin_right_edges = bin_edges[1:]
cdf_vals = cdf(bin_right_edges)
statistic = np.max([
cdf_vals - (i-1) / n,
i / n - cdf_vals
])
pvalue = stats.distributions.kstwo.sf(statistic, n)
return statistic, pvalue
然后使用如下:
from pyspark.sql import functions as F, types as T
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np
from scipy import stats
# Choose the number of buckets. It depends on your memory
# availability and affects the accuracy of the test.
num_buckets = 1_000
# Choose the null hypothesis (H0)
h0_cdf = stats.norm(0, 1).cdf
# Bucket the result and get the buckets' edges
bucketizer = QuantileDiscretizer(
numBuckets=num_buckets, inputCol='RESULT', outputCol='result_bucket'
).setHandleInvalid("keep").fit(df)
buckets = np.array(bucketizer.getSplits())
def kstest(key, pdf: pd.DataFrame):
pdf.sort_values('result_bucket', inplace=True)
hist = pdf['count'].to_numpy()
# Some of the buckets might not appear in all the groups, so
# we filter buckets that are not available.
bin_edges = buckets[[0, *(pdf['result_bucket'].to_numpy() + 1)]]
statistic, pvalue = hist_kstest(hist, bin_edges, h0_cdf)
return pd.DataFrame([[*key, statistic, pvalue]])
df = bucketizer.transform(df).groupBy("ID", "result_bucket").agg(
F.count("*").alias("count")
).groupby("ID").applyInPandas(kstest, "ID long, statistic double, pvalue double")