根据 Java 中列的自定义函数,在 Spark SQL 中删除重复行
Drop duplicates row in Spark SQL based on custom function on a column in Java
我正在尝试从 Java 中的 Spark SQL 中的数据集 中删除重复项。我的数据集包含三列。假设列的名称是 name, timestamp, and score
。名称是员工姓名的字符串表示,时间戳是员工所做的 activity 的长(纪元表示)。分数是代表员工分数的整数字段。
现在,假设我有以下数据集:
Name --> timestamp --> scores
John --> 1595239200000 --> 10
John --> 1595242800000 --> 10
Bob --> 1595246400000 --> 20
John --> 1595239200000 --> 10
请注意,在上面的数据集中,第一行和第四行是相同的。
当我对上述数据集使用 distinct() 函数时,这样做
myDataset.distinct()
我得到的结果是
Name --> timestamp --> scores
John --> 1595239200000 --> 10
John --> 1595242800000 --> 10
Bob --> 1595246400000 --> 20
本例中的第四行被删除,这是预期的行为。
我想要的是将 timestamp
字段转换为 yyyy-MM-dd
格式,然后使用名称字段的组合删除重复项。因此,从原始数据集中,第一行、第二行和第四行具有相同的日期值,即 Name = John 的 2020-07-20
。我只想为名称设置一行 = 'John'.
因此,在如上所述删除重复行后,从上述数据集中生成的数据集将变为
Name --> timestamp --> scores
John --> 1595239200000 --> 10
Bob --> 1595246400000 --> 20
请注意,我没有任何限制只为同名保留第一个时间戳。任何时间戳都适用于我,只要它们都属于同一日期。
到目前为止我尝试过的是
Dataset<Row> duplicateRemovedDataset = myDataset
.dropDuplicates("Name", String.valueOf(functions.from_unixtime
(functions.col("timestamp").divide(1000), "yyyy-MM-dd")));
但这让我产生了这个错误
User class threw exception: org.apache.spark.sql.AnalysisException:
Cannot resolve column name "from_unixtime((timestamp / 1000), yyyy-MM-dd)" among list of my column name
我应该怎么做?
或者更笼统的说法是如何在对数据集调用 dropDuplicates
时调用自定义函数?
试试这个:
val myDataset = Seq(("John","1595239200000",10),
("John", "1595242800000" ,10),
("Bob", "1595246400000" ,20),
("John", "1595239200000" ,10)
)
.toDF("Name", "timestamp","score")
myDataset.show()
+----+-------------+-----+
|Name| timestamp|score|
+----+-------------+-----+
|John|1595239200000| 10|
|John|1595242800000| 10|
| Bob|1595246400000| 20|
|John|1595239200000| 10|
+----+-------------+-----+
import org.apache.spark.sql.functions.{col, to_date, to_timestamp}
myDataset.withColumn("datestamp",to_date(from_unixtime($"timestamp" / 1000))).dropDuplicates("name","datestamp").show()
+----+-------------+-----+----------+
|name| timestamp|score| datestamp|
+----+-------------+-----+----------+
| Bob|1595246400000| 20|2020-07-20|
|John|1595239200000| 10|2020-07-20|
+----+-------------+-----+----------+
您可以创建一个新的 column
具有您需要的日期格式,并 drop
在您想要的列上重复,如下所示
对于Java
import static org.apache.spark.sql.functions.*;
Dataset<Row> resultDF = df.withColumn("date", to_date(to_timestamp(df.col("Timestamp").divide(1000)), "yyyy-MM-dd"));
resultDF.dropDuplicates("Name", "date")
.drop("date")
.show(false);
对于 Scala
import org.apache.spark.sql.functions._
val resultDF = df.withColumn("date", to_date(to_timestamp(col("Timestamp") / 1000), "yyyy-MM-dd"))
resultDF.dropDuplicates("Name", "date")
.drop("date")
.show(false)
输出:
+----+-------------+-----+
|Name|Timestamp |score|
+----+-------------+-----+
|Bob |1595246400000|20 |
|John|1595239200000|10 |
+----+-------------+-----+
我正在尝试从 Java 中的 Spark SQL 中的数据集 中删除重复项。我的数据集包含三列。假设列的名称是 name, timestamp, and score
。名称是员工姓名的字符串表示,时间戳是员工所做的 activity 的长(纪元表示)。分数是代表员工分数的整数字段。
现在,假设我有以下数据集:
Name --> timestamp --> scores
John --> 1595239200000 --> 10
John --> 1595242800000 --> 10
Bob --> 1595246400000 --> 20
John --> 1595239200000 --> 10
请注意,在上面的数据集中,第一行和第四行是相同的。
当我对上述数据集使用 distinct() 函数时,这样做
myDataset.distinct()
我得到的结果是
Name --> timestamp --> scores
John --> 1595239200000 --> 10
John --> 1595242800000 --> 10
Bob --> 1595246400000 --> 20
本例中的第四行被删除,这是预期的行为。
我想要的是将 timestamp
字段转换为 yyyy-MM-dd
格式,然后使用名称字段的组合删除重复项。因此,从原始数据集中,第一行、第二行和第四行具有相同的日期值,即 Name = John 的 2020-07-20
。我只想为名称设置一行 = 'John'.
因此,在如上所述删除重复行后,从上述数据集中生成的数据集将变为
Name --> timestamp --> scores
John --> 1595239200000 --> 10
Bob --> 1595246400000 --> 20
请注意,我没有任何限制只为同名保留第一个时间戳。任何时间戳都适用于我,只要它们都属于同一日期。
到目前为止我尝试过的是
Dataset<Row> duplicateRemovedDataset = myDataset
.dropDuplicates("Name", String.valueOf(functions.from_unixtime
(functions.col("timestamp").divide(1000), "yyyy-MM-dd")));
但这让我产生了这个错误
User class threw exception: org.apache.spark.sql.AnalysisException:
Cannot resolve column name "from_unixtime((timestamp / 1000), yyyy-MM-dd)" among list of my column name
我应该怎么做?
或者更笼统的说法是如何在对数据集调用 dropDuplicates
时调用自定义函数?
试试这个:
val myDataset = Seq(("John","1595239200000",10),
("John", "1595242800000" ,10),
("Bob", "1595246400000" ,20),
("John", "1595239200000" ,10)
)
.toDF("Name", "timestamp","score")
myDataset.show()
+----+-------------+-----+
|Name| timestamp|score|
+----+-------------+-----+
|John|1595239200000| 10|
|John|1595242800000| 10|
| Bob|1595246400000| 20|
|John|1595239200000| 10|
+----+-------------+-----+
import org.apache.spark.sql.functions.{col, to_date, to_timestamp}
myDataset.withColumn("datestamp",to_date(from_unixtime($"timestamp" / 1000))).dropDuplicates("name","datestamp").show()
+----+-------------+-----+----------+
|name| timestamp|score| datestamp|
+----+-------------+-----+----------+
| Bob|1595246400000| 20|2020-07-20|
|John|1595239200000| 10|2020-07-20|
+----+-------------+-----+----------+
您可以创建一个新的 column
具有您需要的日期格式,并 drop
在您想要的列上重复,如下所示
对于Java
import static org.apache.spark.sql.functions.*;
Dataset<Row> resultDF = df.withColumn("date", to_date(to_timestamp(df.col("Timestamp").divide(1000)), "yyyy-MM-dd"));
resultDF.dropDuplicates("Name", "date")
.drop("date")
.show(false);
对于 Scala
import org.apache.spark.sql.functions._
val resultDF = df.withColumn("date", to_date(to_timestamp(col("Timestamp") / 1000), "yyyy-MM-dd"))
resultDF.dropDuplicates("Name", "date")
.drop("date")
.show(false)
输出:
+----+-------------+-----+
|Name|Timestamp |score|
+----+-------------+-----+
|Bob |1595246400000|20 |
|John|1595239200000|10 |
+----+-------------+-----+