使用 InterQuartileRange 消除 Spark 中的异常值会导致错误
Outlier Elimination in Spark With InterQuartileRange Results in Error
我有以下使用 InterQuartileRange 方法确定异常值的递归函数:
def interQuartileRangeFiltering(df: DataFrame): DataFrame = {
@scala.annotation.tailrec
def inner(cols: List[String], acc: DataFrame): DataFrame = cols match {
case Nil => acc
case column :: xs =>
val quantiles = acc.stat.approxQuantile(column, Array(0.25, 0.75), 0.0) // TODO: values should come from config
println(s"$column ${quantiles.size}")
val q1 = quantiles(0)
val q3 = quantiles(1)
val iqr = q1 - q3
val lowerRange = q1 - 1.5 * iqr
val upperRange = q3 + 1.5 * iqr
val filtered = acc.filter(s"$column < $lowerRange or $column > $upperRange")
inner(xs, filtered)
}
inner(df.columns.toList, df)
}
val outlierDF = interQuartileRangeFiltering(incomingDF)
基本上我所做的是递归迭代列并消除异常值。奇怪的是,它导致 ArrayIndexOutOfBounds 异常并打印以下内容:
housing_median_age 2
inland 2
island 2
population 2
total_bedrooms 2
near_bay 2
near_ocean 2
median_house_value 0
java.lang.ArrayIndexOutOfBoundsException: 0
at inner(<console>:75)
at interQuartileRangeFiltering(<console>:83)
... 54 elided
我的方法有什么问题?
这是我想出的并且效果很好:
def outlierEliminator(df: DataFrame, colsToIgnore: List[String])(fn: (String, DataFrame) => (Double, Double)): DataFrame = {
val ID_COL_NAME = "id"
val dfWithId = DataFrameUtils.addColumnIndex(spark, df, ID_COL_NAME)
val dfWithIgnoredCols = dfWithId.drop(colsToIgnore: _*)
@tailrec
def inner(
cols: List[String],
filterIdSeq: List[Long],
dfWithId: DataFrame
): List[Long] = cols match {
case Nil => filterIdSeq
case column :: xs =>
if (column == ID_COL_NAME) {
inner(xs, filterIdSeq, dfWithId)
} else {
val (lowerBound, upperBound) = fn(column, dfWithId)
val filteredIds =
dfWithId
.filter(s"$column < $lowerBound or $column > $upperBound")
.select(col(ID_COL_NAME))
.map(r => r.getLong(0))
.collect
.toList
inner(xs, filteredIds ++ filterIdSeq, dfWithId)
}
}
val filteredIds = inner(dfWithIgnoredCols.columns.toList, List.empty[Long], dfWithIgnoredCols)
dfWithId.except(dfWithId.filter($"$ID_COL_NAME".isin(filteredIds: _*)))
}
我有以下使用 InterQuartileRange 方法确定异常值的递归函数:
def interQuartileRangeFiltering(df: DataFrame): DataFrame = {
@scala.annotation.tailrec
def inner(cols: List[String], acc: DataFrame): DataFrame = cols match {
case Nil => acc
case column :: xs =>
val quantiles = acc.stat.approxQuantile(column, Array(0.25, 0.75), 0.0) // TODO: values should come from config
println(s"$column ${quantiles.size}")
val q1 = quantiles(0)
val q3 = quantiles(1)
val iqr = q1 - q3
val lowerRange = q1 - 1.5 * iqr
val upperRange = q3 + 1.5 * iqr
val filtered = acc.filter(s"$column < $lowerRange or $column > $upperRange")
inner(xs, filtered)
}
inner(df.columns.toList, df)
}
val outlierDF = interQuartileRangeFiltering(incomingDF)
基本上我所做的是递归迭代列并消除异常值。奇怪的是,它导致 ArrayIndexOutOfBounds 异常并打印以下内容:
housing_median_age 2
inland 2
island 2
population 2
total_bedrooms 2
near_bay 2
near_ocean 2
median_house_value 0
java.lang.ArrayIndexOutOfBoundsException: 0
at inner(<console>:75)
at interQuartileRangeFiltering(<console>:83)
... 54 elided
我的方法有什么问题?
这是我想出的并且效果很好:
def outlierEliminator(df: DataFrame, colsToIgnore: List[String])(fn: (String, DataFrame) => (Double, Double)): DataFrame = {
val ID_COL_NAME = "id"
val dfWithId = DataFrameUtils.addColumnIndex(spark, df, ID_COL_NAME)
val dfWithIgnoredCols = dfWithId.drop(colsToIgnore: _*)
@tailrec
def inner(
cols: List[String],
filterIdSeq: List[Long],
dfWithId: DataFrame
): List[Long] = cols match {
case Nil => filterIdSeq
case column :: xs =>
if (column == ID_COL_NAME) {
inner(xs, filterIdSeq, dfWithId)
} else {
val (lowerBound, upperBound) = fn(column, dfWithId)
val filteredIds =
dfWithId
.filter(s"$column < $lowerBound or $column > $upperBound")
.select(col(ID_COL_NAME))
.map(r => r.getLong(0))
.collect
.toList
inner(xs, filteredIds ++ filterIdSeq, dfWithId)
}
}
val filteredIds = inner(dfWithIgnoredCols.columns.toList, List.empty[Long], dfWithIgnoredCols)
dfWithId.except(dfWithId.filter($"$ID_COL_NAME".isin(filteredIds: _*)))
}