如何替换具有的数据框列?具有spark scala中列平均值的符号?
How to replace dataframe columns having ? symbol with mean value of the column in spark scala?
具有以下列的数据框:
一个
两个
三个
四个
3
?
若恩
3.47
3
164
若恩
3.47
1
?
?
2.68
3
164
考尔
?
1
?
?
2.68
需要替换带有“?”的 df 列根据其数据类型具有均值和众数。
如果列是Int类型 -> 需要替换成mean
如果列是String类型->需要替换成mode
预期输出:
一个
两个
三个
四个
3
65.6
若恩
3.47
3
164
若恩
3.47
1
65.6
若恩
2.68
3
164
考尔
2.46
1
65.6
若恩
2.68
该模式有点棘手,因为 SparkSQL 不支持 mode()
或类似功能。但是你可以使用 window 函数:
select t.*,
coalesce(two, two_avg),
coalesce(three,
max(case when three_cnt = max_three_cnt then three end) over ()
),
coalesce(four, four_avg)
from (select t.*,
max(three_cnt) over () as max_three_cnt
from (select t.*,
avg(two) over () as two_avg,
count(*) over (partition by three) as three_cnt,
avg(four) over () as four_avg
from t
) t
) t;
下面的代码将产生所需的结果,但可能需要一些优化。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val inDF = // Read data
inDF
.withColumn("two", when('two === "?",
mean(when('two === "?", 0).otherwise('two)).over()).otherwise('two))
.withColumn("four", when('four === "?",
mean(when('four === "?", 0).otherwise('four)).over()).otherwise('four))
.withColumn("no_occurrence", count("*").over(Window.partitionBy("three")))
.withColumn("max_occurrence", when('three =!= "?", max('no_occurrence).over()).otherwise(0))
.withColumn("replacement", max(when('no_occurrence === 'max_occurrence, 'three)).over())
.withColumn("three", when('three === "?", 'replacement).otherwise('three))
.drop("no_occurrence", "max_occurrence", "replacement")
.show(false)
+---+----+-----+----+
|one|two |three|four|
+---+----+-----+----+
|3 |65.6|Jaun |3.47|
|3 |164 |Jaun |3.47|
|3 |164 |Kaul |2.46|
|1 |65.6|Jaun |2.68|
|1 |65.6|Jaun |2.68|
+---+----+-----+----+
具有以下列的数据框:
一个 | 两个 | 三个 | 四个 |
---|---|---|---|
3 | ? | 若恩 | 3.47 |
3 | 164 | 若恩 | 3.47 |
1 | ? | ? | 2.68 |
3 | 164 | 考尔 | ? |
1 | ? | ? | 2.68 |
需要替换带有“?”的 df 列根据其数据类型具有均值和众数。 如果列是Int类型 -> 需要替换成mean 如果列是String类型->需要替换成mode
预期输出:
一个 | 两个 | 三个 | 四个 |
---|---|---|---|
3 | 65.6 | 若恩 | 3.47 |
3 | 164 | 若恩 | 3.47 |
1 | 65.6 | 若恩 | 2.68 |
3 | 164 | 考尔 | 2.46 |
1 | 65.6 | 若恩 | 2.68 |
该模式有点棘手,因为 SparkSQL 不支持 mode()
或类似功能。但是你可以使用 window 函数:
select t.*,
coalesce(two, two_avg),
coalesce(three,
max(case when three_cnt = max_three_cnt then three end) over ()
),
coalesce(four, four_avg)
from (select t.*,
max(three_cnt) over () as max_three_cnt
from (select t.*,
avg(two) over () as two_avg,
count(*) over (partition by three) as three_cnt,
avg(four) over () as four_avg
from t
) t
) t;
下面的代码将产生所需的结果,但可能需要一些优化。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val inDF = // Read data
inDF
.withColumn("two", when('two === "?",
mean(when('two === "?", 0).otherwise('two)).over()).otherwise('two))
.withColumn("four", when('four === "?",
mean(when('four === "?", 0).otherwise('four)).over()).otherwise('four))
.withColumn("no_occurrence", count("*").over(Window.partitionBy("three")))
.withColumn("max_occurrence", when('three =!= "?", max('no_occurrence).over()).otherwise(0))
.withColumn("replacement", max(when('no_occurrence === 'max_occurrence, 'three)).over())
.withColumn("three", when('three === "?", 'replacement).otherwise('three))
.drop("no_occurrence", "max_occurrence", "replacement")
.show(false)
+---+----+-----+----+
|one|two |three|four|
+---+----+-----+----+
|3 |65.6|Jaun |3.47|
|3 |164 |Jaun |3.47|
|3 |164 |Kaul |2.46|
|1 |65.6|Jaun |2.68|
|1 |65.6|Jaun |2.68|
+---+----+-----+----+