如何替换具有的数据框列?具有spark scala中列平均值的符号?

How to replace dataframe columns having ? symbol with mean value of the column in spark scala?

具有以下列的数据框:

一个 两个 三个 四个
3 ? 若恩 3.47
3 164 若恩 3.47
1 ? ? 2.68
3 164 考尔 ?
1 ? ? 2.68

需要替换带有“?”的 df 列根据其数据类型具有均值和众数。 如果列是Int类型 -> 需要替换成mean 如果列是String类型->需要替换成mode

预期输出:

一个 两个 三个 四个
3 65.6 若恩 3.47
3 164 若恩 3.47
1 65.6 若恩 2.68
3 164 考尔 2.46
1 65.6 若恩 2.68

该模式有点棘手,因为 SparkSQL 不支持 mode() 或类似功能。但是你可以使用 window 函数:

select t.*,
       coalesce(two, two_avg),
       coalesce(three,
                max(case when three_cnt = max_three_cnt then three end) over ()
               ),
       coalesce(four, four_avg)
from (select t.*,
             max(three_cnt) over () as max_three_cnt
      from (select t.*,
                   avg(two) over () as two_avg,
                   count(*) over (partition by three) as three_cnt,
                   avg(four) over () as four_avg
            from t
           ) t
     ) t;

下面的代码将产生所需的结果,但可能需要一些优化。

  val spark = SparkSession.builder().master("local[*]").getOrCreate()
  import spark.implicits._
  spark.sparkContext.setLogLevel("ERROR")

  val inDF = // Read data

  inDF
    .withColumn("two", when('two === "?",
      mean(when('two === "?", 0).otherwise('two)).over()).otherwise('two))
    .withColumn("four", when('four === "?",
      mean(when('four === "?", 0).otherwise('four)).over()).otherwise('four))
    .withColumn("no_occurrence", count("*").over(Window.partitionBy("three")))
    .withColumn("max_occurrence", when('three =!= "?", max('no_occurrence).over()).otherwise(0))
    .withColumn("replacement", max(when('no_occurrence === 'max_occurrence, 'three)).over())
    .withColumn("three", when('three === "?", 'replacement).otherwise('three))
    .drop("no_occurrence", "max_occurrence", "replacement")
    .show(false)

    +---+----+-----+----+
    |one|two |three|four|
    +---+----+-----+----+
    |3  |65.6|Jaun |3.47|
    |3  |164 |Jaun |3.47|
    |3  |164 |Kaul |2.46|
    |1  |65.6|Jaun |2.68|
    |1  |65.6|Jaun |2.68|
    +---+----+-----+----+