dropDuplicates 非数字条件
dropDuplicates with non-numeric condition
我有一个看起来像这样的数据框(我还有几列,但它们不相关):
+-----------+-----------+---------------+
|item_id |location_id|decision |
+-----------+-----------+---------------+
| 111111| A | True |
| 111111| A | False |
| 111111| A | False |
| 222222| B | False |
| 222222| B | False |
| 333333| C | True |
| 333333| C | True |
| 333333| C | Unsure |
+-----------+-----------+---------------+
我想做 dropDuplicates("item_id", "location_id")
这样我就可以删除具有相同 item_id
和 location_id
的行,但我想保留包含 True
的行或 Unsure
如果存在。如果 none 的重复行包含 True
或 Unsure
,则任何包含 False
的行都可以。对于上面的示例,我希望生成的数据框如下所示:
+-----------+-----------+---------------+
|item_id |location_id|decision |
+-----------+-----------+---------------+
| 111111| A | True |
| 222222| B | False |
| 333333| C | Unsure |
+-----------+-----------+---------------+
对于 item_id
111111 和 location_id
A,我想要带有 decision
True 的行,因为存在这样的行。对于 item_id
222222 和 location_id
B,因为 none 行包含 True,所以选择其中一个都可以。对于 item_id
333333 和 location_id
C,所有行都包含所需的值 True
或 Unsure
,因此选择三个中的任何一个都可以。
我正在使用 Scala,因此非常感谢 Scala 中的解决方案。
这是代码:
输入准备:
//spark : My SparkSession
import spark.implicits._
val df = Seq(
(111111, "A", "True"),
(111111, "A", "False"),
(111111, "A", "False"),
(222222, "B", "False"),
(222222, "B", "False"),
(333333, "C", "True"),
(333333, "C", "True"),
(333333, "C", "Unsure")
).toDF("item_id", "location_id", "decision")
df.printSchema()
/** root
* |-- item_id: integer (nullable = false)
* |-- location_id: string (nullable = true)
* |-- decision: string (nullable = true)
*/
实现所需输出的代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
//Step 1) create a WindowSpec : MyWindow is same as that of groupBy("item_id", "location_id")
//but I want to keep track of the order of True, False and Unsure in that partition
//so, will order my partition based on the col("decision") which is why we have window functions.
val MyWindow = Window
.partitionBy(col("item_id"), col("location_id"))
.orderBy(desc("decision"))
df
//Step 2) add row_number to each record in that window (based on the mentioned ordering in MyWindow),
//in this case based on the descending order of col("decision")
.withColumn("row_number", row_number().over(MyWindow))
//Step 3) It turns out we only need first row from each partition
//based on the decision to select Unsure (then) True (then) False (based on the order of preference),
//so, we filter in only first row.
.filter(col("row_number").equalTo(1))
.drop(col("row_number"))
.orderBy(col("item_id"))
.show(false)
/**
OUTPUT:
+-------+-----------+--------+
|item_id|location_id|decision|
+-------+-----------+--------+
|111111 |A |True |
|222222 |B |False |
|333333 |C |Unsure |
+-------+-----------+--------+
*/
EDIT1(根据评论):
改进的代码(没有在 WindowSpec 中订购 col("decision")):
为此,您需要编写自定义UserDefinedAggregateFunction,以便您更好地控制决策属性值的范围,在您的要求中可以是这样的:
object MyBestDecisionUDF extends UserDefinedAggregateFunction {
// step 1) : to set priority score to your decisions which you can configure somewhere
val decisionOrderMap =
Map("Unsure" -> 4, "True" -> 3, "False" -> 2, "Zinc" -> 1, "Copper" -> 0)
/** all overridden functions come from UserDefinedAggregateFunction Abstract Class
*/
override def inputSchema: StructType = StructType(
StructField("input_str", StringType, false) :: Nil
)
override def bufferSchema: StructType = StructType(
StructField("buffer_str", StringType, false) :: Nil
)
override def dataType: DataType = StringType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, "")
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// main step : updating buffer always to hold best decision string value
if (
decisionOrderMap.getOrElse(
buffer.get(0).toString(),
-1
) < decisionOrderMap.getOrElse(input(0).toString(), -1)
) {
buffer.update(0, input(0))
}
}
override def merge(
buffer1: MutableAggregationBuffer,
buffer2: Row
): Unit = {}
override def evaluate(buffer: Row): Any = {
buffer(0)
}
}
/** ###############################################################
* Calling Custom UDAF
* ###############################################################
*/
/**
INPUT:
+-------+-----------+--------+
|item_id|location_id|decision|
+-------+-----------+--------+
|111111 |A |True |
|111111 |A |False |
|111111 |A |False |
|222222 |B |False |
|222222 |B |False |
|333333 |C |True |
|333333 |C |Unsure |
|444444 |D |Copper |
|444444 |D |Zinc |
+-------+-----------+--------+
*/
df
// Custom UDF evaluated column
.withColumn(
"my_best_decision",
MyBestDecisionUDF(col("decision")).over(
Window
.partitionBy(col("item_id"), col("location_id"))
)
)
.drop(col("decision"))
.distinct()
.orderBy(col("item_id"))
.show(false)
/**
* OUTPUT:
+-------+-----------+----------------+
|item_id|location_id|my_best_decision|
+-------+-----------+----------------+
|111111 |A |True |
|222222 |B |False |
|333333 |C |Unsure |
|444444 |D |Zinc |
+-------+-----------+----------------+
*/
离开你对 Lingaraj 的回答的评论,有一种比向 decision
添加文字更复杂的解决方法。您可以创建一个布尔值:
val decisionIsImportant = col("decision") === "True" || col("decision") === "Unsure" || col("decision") === Zebra || col("decision") == Xylophone
如果 decision
列等于您要确定优先级的任何值,则此布尔值的计算结果为 True
。
然后,您可以创建一个 Window 并将其应用于 DataFrame:
val decisionSortWindow = Window
.partitionBy(col("item_id"), col("location_id")))
.orderBy(decisionIsImportant.desc)
df.withColumn("row_number", row_number().over(MyWindow))
.filter(col("row_number").equalTo(1))
.drop(col("row_number"))
.orderBy(col("item_id"))
.show(false)
由于所有评估为 True
的 decision
值将首先排序,您可以仅按 col("row_number").equalTo(1)
进行过滤,因为行号 1 肯定是重要的行之一是否有人在场的决定。
我有一个看起来像这样的数据框(我还有几列,但它们不相关):
+-----------+-----------+---------------+
|item_id |location_id|decision |
+-----------+-----------+---------------+
| 111111| A | True |
| 111111| A | False |
| 111111| A | False |
| 222222| B | False |
| 222222| B | False |
| 333333| C | True |
| 333333| C | True |
| 333333| C | Unsure |
+-----------+-----------+---------------+
我想做 dropDuplicates("item_id", "location_id")
这样我就可以删除具有相同 item_id
和 location_id
的行,但我想保留包含 True
的行或 Unsure
如果存在。如果 none 的重复行包含 True
或 Unsure
,则任何包含 False
的行都可以。对于上面的示例,我希望生成的数据框如下所示:
+-----------+-----------+---------------+
|item_id |location_id|decision |
+-----------+-----------+---------------+
| 111111| A | True |
| 222222| B | False |
| 333333| C | Unsure |
+-----------+-----------+---------------+
对于 item_id
111111 和 location_id
A,我想要带有 decision
True 的行,因为存在这样的行。对于 item_id
222222 和 location_id
B,因为 none 行包含 True,所以选择其中一个都可以。对于 item_id
333333 和 location_id
C,所有行都包含所需的值 True
或 Unsure
,因此选择三个中的任何一个都可以。
我正在使用 Scala,因此非常感谢 Scala 中的解决方案。
这是代码:
输入准备:
//spark : My SparkSession
import spark.implicits._
val df = Seq(
(111111, "A", "True"),
(111111, "A", "False"),
(111111, "A", "False"),
(222222, "B", "False"),
(222222, "B", "False"),
(333333, "C", "True"),
(333333, "C", "True"),
(333333, "C", "Unsure")
).toDF("item_id", "location_id", "decision")
df.printSchema()
/** root
* |-- item_id: integer (nullable = false)
* |-- location_id: string (nullable = true)
* |-- decision: string (nullable = true)
*/
实现所需输出的代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
//Step 1) create a WindowSpec : MyWindow is same as that of groupBy("item_id", "location_id")
//but I want to keep track of the order of True, False and Unsure in that partition
//so, will order my partition based on the col("decision") which is why we have window functions.
val MyWindow = Window
.partitionBy(col("item_id"), col("location_id"))
.orderBy(desc("decision"))
df
//Step 2) add row_number to each record in that window (based on the mentioned ordering in MyWindow),
//in this case based on the descending order of col("decision")
.withColumn("row_number", row_number().over(MyWindow))
//Step 3) It turns out we only need first row from each partition
//based on the decision to select Unsure (then) True (then) False (based on the order of preference),
//so, we filter in only first row.
.filter(col("row_number").equalTo(1))
.drop(col("row_number"))
.orderBy(col("item_id"))
.show(false)
/**
OUTPUT:
+-------+-----------+--------+
|item_id|location_id|decision|
+-------+-----------+--------+
|111111 |A |True |
|222222 |B |False |
|333333 |C |Unsure |
+-------+-----------+--------+
*/
EDIT1(根据评论):
改进的代码(没有在 WindowSpec 中订购 col("decision")):
为此,您需要编写自定义UserDefinedAggregateFunction,以便您更好地控制决策属性值的范围,在您的要求中可以是这样的:
object MyBestDecisionUDF extends UserDefinedAggregateFunction {
// step 1) : to set priority score to your decisions which you can configure somewhere
val decisionOrderMap =
Map("Unsure" -> 4, "True" -> 3, "False" -> 2, "Zinc" -> 1, "Copper" -> 0)
/** all overridden functions come from UserDefinedAggregateFunction Abstract Class
*/
override def inputSchema: StructType = StructType(
StructField("input_str", StringType, false) :: Nil
)
override def bufferSchema: StructType = StructType(
StructField("buffer_str", StringType, false) :: Nil
)
override def dataType: DataType = StringType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, "")
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// main step : updating buffer always to hold best decision string value
if (
decisionOrderMap.getOrElse(
buffer.get(0).toString(),
-1
) < decisionOrderMap.getOrElse(input(0).toString(), -1)
) {
buffer.update(0, input(0))
}
}
override def merge(
buffer1: MutableAggregationBuffer,
buffer2: Row
): Unit = {}
override def evaluate(buffer: Row): Any = {
buffer(0)
}
}
/** ###############################################################
* Calling Custom UDAF
* ###############################################################
*/
/**
INPUT:
+-------+-----------+--------+
|item_id|location_id|decision|
+-------+-----------+--------+
|111111 |A |True |
|111111 |A |False |
|111111 |A |False |
|222222 |B |False |
|222222 |B |False |
|333333 |C |True |
|333333 |C |Unsure |
|444444 |D |Copper |
|444444 |D |Zinc |
+-------+-----------+--------+
*/
df
// Custom UDF evaluated column
.withColumn(
"my_best_decision",
MyBestDecisionUDF(col("decision")).over(
Window
.partitionBy(col("item_id"), col("location_id"))
)
)
.drop(col("decision"))
.distinct()
.orderBy(col("item_id"))
.show(false)
/**
* OUTPUT:
+-------+-----------+----------------+
|item_id|location_id|my_best_decision|
+-------+-----------+----------------+
|111111 |A |True |
|222222 |B |False |
|333333 |C |Unsure |
|444444 |D |Zinc |
+-------+-----------+----------------+
*/
离开你对 Lingaraj 的回答的评论,有一种比向 decision
添加文字更复杂的解决方法。您可以创建一个布尔值:
val decisionIsImportant = col("decision") === "True" || col("decision") === "Unsure" || col("decision") === Zebra || col("decision") == Xylophone
如果 decision
列等于您要确定优先级的任何值,则此布尔值的计算结果为 True
。
然后,您可以创建一个 Window 并将其应用于 DataFrame:
val decisionSortWindow = Window
.partitionBy(col("item_id"), col("location_id")))
.orderBy(decisionIsImportant.desc)
df.withColumn("row_number", row_number().over(MyWindow))
.filter(col("row_number").equalTo(1))
.drop(col("row_number"))
.orderBy(col("item_id"))
.show(false)
由于所有评估为 True
的 decision
值将首先排序,您可以仅按 col("row_number").equalTo(1)
进行过滤,因为行号 1 肯定是重要的行之一是否有人在场的决定。