使用 scala 数据帧中的最小值和最大值查找正常值
Find Normal value using Min and Max from scala data-frame
我有一个包含 39 列的数据框,每一列都有不同的正常范围。
通过使用正常范围,我想找出正常值并输入 0,否则输入 1。
这是我所做的,但我想为 39 列做。
val test :(Double => Double) = (value: Double) =>
{
if(value >= 45 && value <= 62) 0
else 1
}
但我不明白如何对每一列使用不同的值。
例如:
我有这个DF
+--------------------+---------+-------------------------+---------+
|a |b |c |d |
+--------------------+---------+-------------------------+---------+
| 207.0| 40.0| 193.0| 39.0|
| 98.0| 17.0| 193.0| 15.0|
| 207.0| 13.0| 193.0| 17.0|
| 207.0| 26.0| 193.0| 23.0|
| 207.0| 35.0| 193.0| 24.0|
| 207.0| 91.0| 193.0| 45.0|
| 207.0| 40.0| 193.0| 37.0|
| 207.0| 23.0| 193.0| 23.0|
| 207.0| 26.0| 193.0| 22.0|
| 207.0| 39.0| 193.0| 34.0|
我想要如下使用范围
的结果
col range
a 50-160
b 1-21
c 5-40
d 7-27
如果值在范围内则为 0 否则为 1
+--------------------+---------+-------------------------+---------+
|a |b |c |d |
+--------------------+---------+-------------------------+---------+
| 1.0| 1.0| 1.0| 1.0|
| 0.0| 0.0| 1.0| 0.0|
| 1.0| 0.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 1.0|
| 1.0| 1.0| 1.0| 1.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 1.0|
I want to do this for 39 columns.(scala/pyspark preferred)
您应该定义一个用户定义函数 (UDF),然后将其应用到您想要的每一列。
这是有关 Scala 用户定义函数的文档。它相当完整,我鼓励您阅读它。
这里是一个摘录,可以帮助您快速了解我想去的地方:
scala> df.withColumn("upper", upper('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
| 0|hello|HELLO|
| 1|world|WORLD|
+---+-----+-----+
// You could have also defined the UDF this way
val upperUDF = udf { s: String => s.toUpperCase }
// or even this way
val upperUDF = udf[String, String](_.toUpperCase)
scala> df.withColumn("upper", upperUDF('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
| 0|hello|HELLO|
| 1|world|WORLD|
+---+-----+-----+
您看到您的函数适用于整个列,结果将是一个新列。因此,您的函数应如下所示:
def isInRange(e: Number, min: Number, max: Number): Boolean = (e < max && e > min)
然后,对于给定的最小值和最大值,您要做的就是:
myDF.withColumn("isInRange_a", udf(x => isInRange(x, minValue, maxValue).apply(myDF("a")))
你现在可以做的是将它应用到包含 (varName, maxValue, minValue) 的给定 List/DataFrame 上:
一个 map/reduce 操作,您可以在其中计算每一列是否在给定范围内。然后,您将加入给定的密钥(我对您的问题了解不多,所以在这里无法为您提供帮助)。这个解决方案有效,但随着数据的增长会变得非常低效,因为你可能有几个看起来很相似的键。
要么是递归操作,其目标是执行如下操作:myDF.whithColumn(...).withColumn(...).withColumn(...)
etc
第二种解决方案是我会选择的解决方案,因为密钥可能看起来很相似。
你是怎么做到的?
def applyMyUDFRecursively(myDF: DataFrame, List[MyRange]: rangesList): DataFrame =
if (rangesList == null || rangesList.isEmpty) myDF
else applyMyUDFRecursively(
myDF.withColumn(myDF.withColumn("isInRange_" + rangesList.head._0, udf(x => isInRange(x, rangesList.head._1, rangesList.head._2).apply(myDF(rangesList.head._0))), rangesList.tail)
现在您已应用到所有列,但您的列可能太多了。做这样的事情:
resultDF.drop(rangesList.map(case x => x._0).collect: _*)
注意类型归属,将 drop 函数应用于 map/collect
时获得的列表中的所有元素
with val MyRange = Seq(varName: String, min: Number, max: Number)
例如。对于您的 DataFrame,它应该如下所示(更简单的版本):
def recApply(myDF: DataFrame, cols: List[String]): DataFrame =
if (cols == null || cols.isEmpty) myDF
else recApply(myDF.withColumn(myDF.withColumn("isInRange_" + col.head, udf(x => test(x).apply(myDF(cols.head))), cols.tail)
然后,将此函数应用于您的 DF 并存储您的结果:
val my_result = recApply(myDF, myDF.cols)
我有一个包含 39 列的数据框,每一列都有不同的正常范围。 通过使用正常范围,我想找出正常值并输入 0,否则输入 1。
这是我所做的,但我想为 39 列做。
val test :(Double => Double) = (value: Double) =>
{
if(value >= 45 && value <= 62) 0
else 1
}
但我不明白如何对每一列使用不同的值。
例如: 我有这个DF
+--------------------+---------+-------------------------+---------+
|a |b |c |d |
+--------------------+---------+-------------------------+---------+
| 207.0| 40.0| 193.0| 39.0|
| 98.0| 17.0| 193.0| 15.0|
| 207.0| 13.0| 193.0| 17.0|
| 207.0| 26.0| 193.0| 23.0|
| 207.0| 35.0| 193.0| 24.0|
| 207.0| 91.0| 193.0| 45.0|
| 207.0| 40.0| 193.0| 37.0|
| 207.0| 23.0| 193.0| 23.0|
| 207.0| 26.0| 193.0| 22.0|
| 207.0| 39.0| 193.0| 34.0|
我想要如下使用范围
的结果col range
a 50-160
b 1-21
c 5-40
d 7-27
如果值在范围内则为 0 否则为 1
+--------------------+---------+-------------------------+---------+
|a |b |c |d |
+--------------------+---------+-------------------------+---------+
| 1.0| 1.0| 1.0| 1.0|
| 0.0| 0.0| 1.0| 0.0|
| 1.0| 0.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 1.0|
| 1.0| 1.0| 1.0| 1.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 0.0|
| 1.0| 1.0| 1.0| 1.0|
I want to do this for 39 columns.(scala/pyspark preferred)
您应该定义一个用户定义函数 (UDF),然后将其应用到您想要的每一列。
这是有关 Scala 用户定义函数的文档。它相当完整,我鼓励您阅读它。
这里是一个摘录,可以帮助您快速了解我想去的地方:
scala> df.withColumn("upper", upper('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
| 0|hello|HELLO|
| 1|world|WORLD|
+---+-----+-----+
// You could have also defined the UDF this way
val upperUDF = udf { s: String => s.toUpperCase }
// or even this way
val upperUDF = udf[String, String](_.toUpperCase)
scala> df.withColumn("upper", upperUDF('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
| 0|hello|HELLO|
| 1|world|WORLD|
+---+-----+-----+
您看到您的函数适用于整个列,结果将是一个新列。因此,您的函数应如下所示:
def isInRange(e: Number, min: Number, max: Number): Boolean = (e < max && e > min)
然后,对于给定的最小值和最大值,您要做的就是:
myDF.withColumn("isInRange_a", udf(x => isInRange(x, minValue, maxValue).apply(myDF("a")))
你现在可以做的是将它应用到包含 (varName, maxValue, minValue) 的给定 List/DataFrame 上:
一个 map/reduce 操作,您可以在其中计算每一列是否在给定范围内。然后,您将加入给定的密钥(我对您的问题了解不多,所以在这里无法为您提供帮助)。这个解决方案有效,但随着数据的增长会变得非常低效,因为你可能有几个看起来很相似的键。
要么是递归操作,其目标是执行如下操作:
myDF.whithColumn(...).withColumn(...).withColumn(...)
etc
第二种解决方案是我会选择的解决方案,因为密钥可能看起来很相似。
你是怎么做到的?
def applyMyUDFRecursively(myDF: DataFrame, List[MyRange]: rangesList): DataFrame =
if (rangesList == null || rangesList.isEmpty) myDF
else applyMyUDFRecursively(
myDF.withColumn(myDF.withColumn("isInRange_" + rangesList.head._0, udf(x => isInRange(x, rangesList.head._1, rangesList.head._2).apply(myDF(rangesList.head._0))), rangesList.tail)
现在您已应用到所有列,但您的列可能太多了。做这样的事情:
resultDF.drop(rangesList.map(case x => x._0).collect: _*)
注意类型归属,将 drop 函数应用于 map/collect
时获得的列表中的所有元素with val MyRange = Seq(varName: String, min: Number, max: Number)
例如。对于您的 DataFrame,它应该如下所示(更简单的版本):
def recApply(myDF: DataFrame, cols: List[String]): DataFrame =
if (cols == null || cols.isEmpty) myDF
else recApply(myDF.withColumn(myDF.withColumn("isInRange_" + col.head, udf(x => test(x).apply(myDF(cols.head))), cols.tail)
然后,将此函数应用于您的 DF 并存储您的结果:
val my_result = recApply(myDF, myDF.cols)