dropDuplicates 非数字条件

dropDuplicates with non-numeric condition

我有一个看起来像这样的数据框(我还有几列,但它们不相关):

+-----------+-----------+---------------+                                        
|item_id    |location_id|decision       |
+-----------+-----------+---------------+
|     111111|  A        |        True   |
|     111111|  A        |        False  |
|     111111|  A        |        False  |
|     222222|  B        |        False  |
|     222222|  B        |        False  |
|     333333|  C        |        True   |
|     333333|  C        |        True   |
|     333333|  C        |        Unsure |
+-----------+-----------+---------------+

我想做 dropDuplicates("item_id", "location_id") 这样我就可以删除具有相同 item_idlocation_id 的行,但我想保留包含 True 的行或 Unsure 如果存在。如果 none 的重复行包含 TrueUnsure,则任何包含 False 的行都可以。对于上面的示例,我希望生成的数据框如下所示:

+-----------+-----------+---------------+                                        
|item_id    |location_id|decision       |
+-----------+-----------+---------------+
|     111111|  A        |        True   |
|     222222|  B        |        False  |
|     333333|  C        |        Unsure |
+-----------+-----------+---------------+

对于 item_id 111111 和 location_id A,我想要带有 decision True 的行,因为存在这样的行。对于 item_id 222222 和 location_id B,因为 none 行包含 True,所以选择其中一个都可以。对于 item_id 333333 和 location_id C,所有行都包含所需的值 TrueUnsure,因此选择三个中的任何一个都可以。

我正在使用 Scala,因此非常感谢 Scala 中的解决方案。

这是代码:

输入准备:

//spark : My SparkSession
import spark.implicits._
  val df = Seq(
    (111111, "A", "True"),
    (111111, "A", "False"),
    (111111, "A", "False"),
    (222222, "B", "False"),
    (222222, "B", "False"),
    (333333, "C", "True"),
    (333333, "C", "True"),
    (333333, "C", "Unsure")
  ).toDF("item_id", "location_id", "decision")

  df.printSchema()
  /** root
    * |-- item_id: integer (nullable = false)
    * |-- location_id: string (nullable = true)
    * |-- decision: string (nullable = true)
    */

实现所需输出的代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

//Step 1) create a WindowSpec : MyWindow is same as that of groupBy("item_id", "location_id") 
//but I want to keep track of the order of True, False and Unsure in that partition 
//so, will order my partition based on the col("decision") which is why we have window functions.
  val MyWindow = Window
    .partitionBy(col("item_id"), col("location_id"))
    .orderBy(desc("decision"))

  df
//Step 2) add row_number to each record in that window (based on the mentioned ordering in MyWindow),
//in this case based on the descending order of col("decision")
    .withColumn("row_number", row_number().over(MyWindow))
//Step 3) It turns out we only need first row from each partition  
//based on the decision to select Unsure (then) True (then) False (based on the order of preference),
//so, we filter in only first row.
    .filter(col("row_number").equalTo(1))
    .drop(col("row_number"))
    .orderBy(col("item_id"))
    .show(false)


/**
OUTPUT:
+-------+-----------+--------+
|item_id|location_id|decision|
+-------+-----------+--------+
|111111 |A          |True    |
|222222 |B          |False   |
|333333 |C          |Unsure  |
+-------+-----------+--------+
*/

EDIT1(根据评论):

改进的代码(没有在 WindowSpec 中订购 col("decision")):

为此,您需要编写自定义UserDefinedAggregateFunction,以便您更好地控制决策属性值的范围,在您的要求中可以是这样的:

 object MyBestDecisionUDF extends UserDefinedAggregateFunction {

    // step 1) : to set priority score to your decisions which you can configure somewhere
    val decisionOrderMap =
      Map("Unsure" -> 4, "True" -> 3, "False" -> 2, "Zinc" -> 1, "Copper" -> 0)

    /** all overridden functions come from UserDefinedAggregateFunction Abstract Class
      */
    override def inputSchema: StructType = StructType(
      StructField("input_str", StringType, false) :: Nil
    )

    override def bufferSchema: StructType = StructType(
      StructField("buffer_str", StringType, false) :: Nil
    )

    override def dataType: DataType = StringType

    override def deterministic: Boolean = true

    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer.update(0, "")
    }

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      // main step : updating buffer always to hold best decision string value
      if (
        decisionOrderMap.getOrElse(
          buffer.get(0).toString(),
          -1
        ) < decisionOrderMap.getOrElse(input(0).toString(), -1)
      ) {
        buffer.update(0, input(0))
      }

    }

    override def merge(
        buffer1: MutableAggregationBuffer,
        buffer2: Row
    ): Unit = {}

    override def evaluate(buffer: Row): Any = {
      buffer(0)
    }

  }

  /** ############################################################### 
    * Calling Custom UDAF
    * ###############################################################
    */

/**
INPUT:

+-------+-----------+--------+
|item_id|location_id|decision|
+-------+-----------+--------+
|111111 |A          |True    |
|111111 |A          |False   |
|111111 |A          |False   |
|222222 |B          |False   |
|222222 |B          |False   |
|333333 |C          |True    |
|333333 |C          |Unsure  |
|444444 |D          |Copper  |
|444444 |D          |Zinc    |
+-------+-----------+--------+

*/

  df
  // Custom UDF evaluated column
    .withColumn(
      "my_best_decision",
      MyBestDecisionUDF(col("decision")).over(
        Window
          .partitionBy(col("item_id"), col("location_id"))
      )
    )
    .drop(col("decision"))
    .distinct()
    .orderBy(col("item_id"))
    .show(false)
/**
 * OUTPUT:
+-------+-----------+----------------+
|item_id|location_id|my_best_decision|
+-------+-----------+----------------+
|111111 |A          |True            |
|222222 |B          |False           |
|333333 |C          |Unsure          |
|444444 |D          |Zinc            |
+-------+-----------+----------------+
*/

离开你对 Lingaraj 的回答的评论,有一种比向 decision 添加文字更复杂的解决方法。您可以创建一个布尔值:

val decisionIsImportant = col("decision") === "True" || col("decision") === "Unsure" || col("decision") === Zebra || col("decision") == Xylophone

如果 decision 列等于您要确定优先级的任何值,则此布尔值的计算结果为 True

然后,您可以创建一个 Window 并将其应用于 DataFrame:

val decisionSortWindow = Window
      .partitionBy(col("item_id"), col("location_id")))
      .orderBy(decisionIsImportant.desc)

df.withColumn("row_number", row_number().over(MyWindow))
  .filter(col("row_number").equalTo(1))
  .drop(col("row_number"))
  .orderBy(col("item_id"))
  .show(false)

由于所有评估为 Truedecision 值将首先排序,您可以仅按 col("row_number").equalTo(1) 进行过滤,因为行号 1 肯定是重要的行之一是否有人在场的决定。