根据 Java 中列的自定义函数,在 Spark SQL 中删除重复行

Drop duplicates row in Spark SQL based on custom function on a column in Java

我正在尝试从 Java 中的 Spark SQL 中的数据集 中删除重复项。我的数据集包含三列。假设列的名称是 name, timestamp, and score。名称是员工姓名的字符串表示,时间戳是员工所做的 activity 的长(纪元表示)。分数是代表员工分数的整数字段。

现在,假设我有以下数据集:

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
John --> 1595242800000   -->  10
Bob  --> 1595246400000   -->  20
John --> 1595239200000   -->  10  

请注意,在上面的数据集中,第一行和第四行是相同的。

当我对上述数据集使用 distinct() 函数时,这样做

myDataset.distinct()

我得到的结果是

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
John --> 1595242800000   -->  10
Bob  --> 1595246400000   -->  20

本例中的第四行被删除,这是预期的行为。

我想要的是将 timestamp 字段转换为 yyyy-MM-dd 格式,然后使用名称字段的组合删除重复项。因此,从原始数据集中,第一行、第二行和第四行具有相同的日期值,即 Name = John 的 2020-07-20。我只想为名称设置一行 = 'John'.

因此,在如上所述删除重复行后,从上述数据集中生成的数据集将变为

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
Bob  --> 1595246400000   -->  20

请注意,我没有任何限制只为同名保留第一个时间戳。任何时间戳都适用于我,只要它们都属于同一日期。

到目前为止我尝试过的是

Dataset<Row> duplicateRemovedDataset = myDataset
                .dropDuplicates("Name", String.valueOf(functions.from_unixtime
                        (functions.col("timestamp").divide(1000), "yyyy-MM-dd")));

但这让我产生了这个错误

User class threw exception: org.apache.spark.sql.AnalysisException: 
Cannot resolve column name "from_unixtime((timestamp / 1000), yyyy-MM-dd)" among list of my column name

我应该怎么做?

或者更笼统的说法是如何在对数据集调用 dropDuplicates 时调用自定义函数?

试试这个:


val myDataset = Seq(("John","1595239200000",10),           
              ("John", "1595242800000" ,10),
             ("Bob", "1595246400000" ,20),
             ("John", "1595239200000" ,10)
            )
.toDF("Name", "timestamp","score")
myDataset.show()

+----+-------------+-----+
|Name|    timestamp|score|
+----+-------------+-----+
|John|1595239200000|   10|
|John|1595242800000|   10|
| Bob|1595246400000|   20|
|John|1595239200000|   10|
+----+-------------+-----+

import org.apache.spark.sql.functions.{col, to_date, to_timestamp}

myDataset.withColumn("datestamp",to_date(from_unixtime($"timestamp" / 1000))).dropDuplicates("name","datestamp").show()

+----+-------------+-----+----------+
|name|    timestamp|score| datestamp|
+----+-------------+-----+----------+
| Bob|1595246400000|   20|2020-07-20|
|John|1595239200000|   10|2020-07-20|
+----+-------------+-----+----------+

您可以创建一个新的 column 具有您需要的日期格式,并 drop 在您想要的列上重复,如下所示

对于Java

import static org.apache.spark.sql.functions.*;
Dataset<Row> resultDF = df.withColumn("date", to_date(to_timestamp(df.col("Timestamp").divide(1000)), "yyyy-MM-dd"));

resultDF.dropDuplicates("Name", "date")
        .drop("date")
        .show(false);

对于 Scala

import org.apache.spark.sql.functions._
val resultDF = df.withColumn("date", to_date(to_timestamp(col("Timestamp") / 1000), "yyyy-MM-dd"))

resultDF.dropDuplicates("Name", "date")
  .drop("date")
  .show(false)

输出:

+----+-------------+-----+
|Name|Timestamp    |score|
+----+-------------+-----+
|Bob |1595246400000|20   |
|John|1595239200000|10   |
+----+-------------+-----+