spark-scala:转换数据框以生成新的列性别,反之亦然

spark-scala: Transform the dataframe to generate new column gender and vice versa

表 1:

class   male    female
1   2   1
2   0   2
3   2   0

table2:

class   gender
1   m
1   f
1   m
2   f
2   f
3   m
3   m

使用 spark-scala 从 table1 中获取数据并以 table2 的格式转储到另一个 table 中作为 given.Also 请反之亦然

请帮帮我。

提前致谢

您可以使用 udfexplode 函数,如下所示。

  import org.apache.spark.sql.functions._
  import spark.implicits._

  val df=Seq((1,2,1),(2,0,2),(3,2,0)).toDF("class","male","female")

//Input Df

+-----+----+------+
|class|male|female|
+-----+----+------+
|    1|   2|     1|
|    2|   0|     2|
|    3|   2|     0|
+-----+----+------+

  val getGenderUdf=udf((x:Int,y:Int)=>List.fill(x)("m")++List.fill(y)("f"))
  val df1=df.withColumn("gender",getGenderUdf(df.col("male"),df.col("female"))).drop("male","female").withColumn("gender",explode($"gender"))
  df1.show()

+-----+------+
|class|gender|
+-----+------+
|    1|     m|
|    1|     m|
|    1|     f|
|    2|     f|
|    2|     f|
|    3|     m|
|    3|     m|
+-----+------+

df1 反转

val df2=df1.groupBy("class").pivot("gender").agg(count("gender")).na.fill(0).withColumnRenamed("m","male").withColumnRenamed("f","female")

  df2.show()

//Sample Output: 

+-----+------+----+
|class|female|male|
+-----+------+----+
|    1|     1|   2|
|    3|     0|   2|
|    2|     2|   0|
+-----+------+----+
  val inDF = Seq((1,2,1),
    (2, 0, 2),
    (3, 2, 0)).toDF("class", "male", "female")


  val testUdf = udf((m: Int, f: Int) => {
    val ml = 1.to(m).map(_ => "m")
    val fml = 1.to(f).map(_ => "f")
    ml ++ fml
  })

  val df1 = inDF.withColumn("mf", testUdf('male, 'female))
  .drop("male", "female")
  .select('class, explode('mf).alias("gender"))

也许这有帮助 - without UDF

spark>=2.4

加载提供的测试数据

val data =
      """
        |class |  male  |  female
        |1 |  2 |  1
        |2 |  0 |  2
        |3 |  2 |  0
      """.stripMargin

    val stringDS1 = data.split(System.lineSeparator())
      .map(_.split("\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()

    /**
      * +-----+----+------+
      * |class|male|female|
      * +-----+----+------+
      * |1    |2   |1     |
      * |2    |0   |2     |
      * |3    |2   |0     |
      * +-----+----+------+
      *
      * root
      * |-- class: integer (nullable = true)
      * |-- male: integer (nullable = true)
      * |-- female: integer (nullable = true)
      */

计算性别数组并分解

    df1.select($"class",
      when($"male" >= 1, sequence(lit(1), col("male"))).otherwise(array()).as("male"),
      when($"female" >= 1, sequence(lit(1), col("female"))).otherwise(array()).as("female")
    ).withColumn("male", expr("TRANSFORM(male, x -> 'm')"))
      .withColumn("female", expr("TRANSFORM(female, x -> 'f')"))
      .withColumn("gender", explode(concat($"male", $"female")))
      .select("class", "gender")
      .show(false)

    /**
      * +-----+------+
      * |class|gender|
      * +-----+------+
      * |1    |m     |
      * |1    |m     |
      * |1    |f     |
      * |2    |f     |
      * |2    |f     |
      * |3    |m     |
      * |3    |m     |
      * +-----+------+
      */

反之亦然

df2.groupBy("class").agg(collect_list("gender").as("gender"))
      .withColumn("male", expr("size(FILTER(gender, x -> x='m'))"))
      .withColumn("female", expr("size(FILTER(gender, x -> x='f'))"))
      .select("class", "male", "female")
      .orderBy("class")
      .show(false)

    /**
      * +-----+----+------+
      * |class|male|female|
      * +-----+----+------+
      * |1    |2   |1     |
      * |2    |0   |2     |
      * |3    |2   |0     |
      * +-----+----+------+
      */