Select 使用 Spark Scala 对数据中的每个组进行 window 操作后的最新时间戳记录
Select latest timestamp record after a window operation for every group in the data with Spark Scala
我运行(用户,应用程序)在一天(86400)的window时间内的尝试计数。我想提取具有 最新时间戳和计数 的行,并删除不必要的先前计数。确保您的回答考虑了时间 window。一个拥有 1 台设备的用户可以在一天或一周内进行多次尝试,我希望能够在每个特定 window.
中检索那些具有最终计数的特定时刻
我的初始数据集是这样的:
val df = sc.parallelize(Seq(
("user1", "iphone", "2017-12-22 10:06:18", "Success"),
("user1", "iphone", "2017-12-22 11:15:12", "failed"),
("user1", "iphone", "2017-12-22 12:06:18", "Success"),
("user1", "iphone", "2017-12-22 09:15:12", "failed"),
("user1", "iphone", "2017-12-20 10:06:18", "Success"),
("user1", "iphone", "2017-12-20 11:15:12", "failed"),
("user1", "iphone", "2017-12-20 12:06:18", "Success"),
("user1", "iphone", "2017-12-20 09:15:12", "failed"),
("user1", "android", "2017-12-20 09:25:20", "Success"),
("user1", "android", "2017-12-20 09:44:22", "Success"),
("user1", "android", "2017-12-20 09:58:22", "Success"),
("user1", "iphone", "2017-12-20 16:44:20", "Success"),
("user1", "iphone", "2017-12-20 16:44:25", "Success"),
("user1", "iphone", "2017-12-20 16:44:35", "Success")
)).toDF("username", "device", "date_time", "status")
代码我运行和我得到的。
// Basically I'm looking 1 day which is 86400 seconds
val w1 = Window.partitionBy("username", "device")
.orderBy(col("date_time").cast("date_time").cast("long").desc)
.rangeBetween(-86400, 0)
val countEveryAttemptDF = df.withColumn("attempts", count("device").over(w1))
现在我有
// countEveryAttemptDF.show
+--------+--------------+---------------------+-------+--------+
|username|. device| date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
| user1| android| 2017-12-20 09:58:22|Success| 1|
| user1| android| 2017-12-20 09:44:22|Success| 2|
| user1| android| 2017-12-20 09:25:20|Success| 3|
| user1| iphone| 2017-12-22 12:06:18|Success| 1|
| user1| iphone| 2017-12-22 11:15:12| failed| 2|
| user1| iphone| 2017-12-22 10:06:18|Success| 3|
| user1| iphone| 2017-12-22 09:15:12| failed| 4|
| user1| iphone| 2017-12-20 16:44:35|Success| 1|
| user1| iphone| 2017-12-20 16:44:25|Success| 2|
| user1| iphone| 2017-12-20 16:44:20|Success| 3|
| user1| iphone| 2017-12-20 12:06:18|Success| 4|
| user1| iphone| 2017-12-20 11:15:12| failed| 5|
| user1| iphone| 2017-12-20 10:06:18|Success| 6|
| user1| iphone| 2017-12-20 09:15:12| failed| 7|
+--------+--------------+---------------------+-------+--------+
我想要的。
所以我想要最新的时间戳及其计数,确保我在同一时间window。
+--------+--------------+---------------------+-------+--------+
|username|. device| date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
| user1 | android | 2017-12-20 09:25:20|Success| 3|
| user1 | iphone | 2017-12-22 09:15:12| failed| 4|
| user1 | iphone | 2017-12-20 09:15:12| failed| 7|
+--------+--------------+---------------------+-------+--------+**
你快到了。您已经通过查看一天的范围计算出计数。现在你所要做的就是找出一天范围内的最新记录,这可以通过在相同的window函数上使用last来完成,但范围相反。
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
def day(x: Int) = x * 86400
val w1 = Window.partitionBy("username", "device")
.orderBy(col("date_time").cast("timestamp").cast("long").desc)
.rangeBetween(-day(1), 0)
val w2 = Window.partitionBy("username", "device")
.orderBy(col("date_time").cast("timestamp").cast("long").desc)
.rangeBetween(0, day(1))
val countEveryAttemptDF = df.withColumn("attempts", count("application_id").over(w1))
.withColumn("att", last("attempts").over(w2))
.filter(col("attempts") === col("att"))
.drop("att")
哪个应该给你
+--------+--------------+---------------------+-------+--------+
|username| device| date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|user1 |android |2017-12-20 09:25:20 |Success|3 |
|user1 |iphone |2017-12-22 09:15:12 | Failed|4 |
|user1 |iphone |2017-12-20 09:15:12 | Failed|7 |
+--------+--------------+---------------------+-------+--------+
类似于下面评论中所述
There are 86400 seconds in 1 day. I wanted to look back 1 day. Similarly 3600 seconds is 1 hour. And 604,800 seconds in 1 week
您可以将日功能更改为小时和周,如下所示,并在 window rangeBetween
中使用它们
def hour(x: Int) = x * 3600
def week(x: Int) = x * 604800
希望回答对你有帮助
我运行(用户,应用程序)在一天(86400)的window时间内的尝试计数。我想提取具有 最新时间戳和计数 的行,并删除不必要的先前计数。确保您的回答考虑了时间 window。一个拥有 1 台设备的用户可以在一天或一周内进行多次尝试,我希望能够在每个特定 window.
中检索那些具有最终计数的特定时刻我的初始数据集是这样的:
val df = sc.parallelize(Seq(
("user1", "iphone", "2017-12-22 10:06:18", "Success"),
("user1", "iphone", "2017-12-22 11:15:12", "failed"),
("user1", "iphone", "2017-12-22 12:06:18", "Success"),
("user1", "iphone", "2017-12-22 09:15:12", "failed"),
("user1", "iphone", "2017-12-20 10:06:18", "Success"),
("user1", "iphone", "2017-12-20 11:15:12", "failed"),
("user1", "iphone", "2017-12-20 12:06:18", "Success"),
("user1", "iphone", "2017-12-20 09:15:12", "failed"),
("user1", "android", "2017-12-20 09:25:20", "Success"),
("user1", "android", "2017-12-20 09:44:22", "Success"),
("user1", "android", "2017-12-20 09:58:22", "Success"),
("user1", "iphone", "2017-12-20 16:44:20", "Success"),
("user1", "iphone", "2017-12-20 16:44:25", "Success"),
("user1", "iphone", "2017-12-20 16:44:35", "Success")
)).toDF("username", "device", "date_time", "status")
代码我运行和我得到的。
// Basically I'm looking 1 day which is 86400 seconds
val w1 = Window.partitionBy("username", "device")
.orderBy(col("date_time").cast("date_time").cast("long").desc)
.rangeBetween(-86400, 0)
val countEveryAttemptDF = df.withColumn("attempts", count("device").over(w1))
现在我有
// countEveryAttemptDF.show
+--------+--------------+---------------------+-------+--------+
|username|. device| date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
| user1| android| 2017-12-20 09:58:22|Success| 1|
| user1| android| 2017-12-20 09:44:22|Success| 2|
| user1| android| 2017-12-20 09:25:20|Success| 3|
| user1| iphone| 2017-12-22 12:06:18|Success| 1|
| user1| iphone| 2017-12-22 11:15:12| failed| 2|
| user1| iphone| 2017-12-22 10:06:18|Success| 3|
| user1| iphone| 2017-12-22 09:15:12| failed| 4|
| user1| iphone| 2017-12-20 16:44:35|Success| 1|
| user1| iphone| 2017-12-20 16:44:25|Success| 2|
| user1| iphone| 2017-12-20 16:44:20|Success| 3|
| user1| iphone| 2017-12-20 12:06:18|Success| 4|
| user1| iphone| 2017-12-20 11:15:12| failed| 5|
| user1| iphone| 2017-12-20 10:06:18|Success| 6|
| user1| iphone| 2017-12-20 09:15:12| failed| 7|
+--------+--------------+---------------------+-------+--------+
我想要的。 所以我想要最新的时间戳及其计数,确保我在同一时间window。
+--------+--------------+---------------------+-------+--------+
|username|. device| date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
| user1 | android | 2017-12-20 09:25:20|Success| 3|
| user1 | iphone | 2017-12-22 09:15:12| failed| 4|
| user1 | iphone | 2017-12-20 09:15:12| failed| 7|
+--------+--------------+---------------------+-------+--------+**
你快到了。您已经通过查看一天的范围计算出计数。现在你所要做的就是找出一天范围内的最新记录,这可以通过在相同的window函数上使用last来完成,但范围相反。
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
def day(x: Int) = x * 86400
val w1 = Window.partitionBy("username", "device")
.orderBy(col("date_time").cast("timestamp").cast("long").desc)
.rangeBetween(-day(1), 0)
val w2 = Window.partitionBy("username", "device")
.orderBy(col("date_time").cast("timestamp").cast("long").desc)
.rangeBetween(0, day(1))
val countEveryAttemptDF = df.withColumn("attempts", count("application_id").over(w1))
.withColumn("att", last("attempts").over(w2))
.filter(col("attempts") === col("att"))
.drop("att")
哪个应该给你
+--------+--------------+---------------------+-------+--------+
|username| device| date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|user1 |android |2017-12-20 09:25:20 |Success|3 |
|user1 |iphone |2017-12-22 09:15:12 | Failed|4 |
|user1 |iphone |2017-12-20 09:15:12 | Failed|7 |
+--------+--------------+---------------------+-------+--------+
类似于下面评论中所述
There are 86400 seconds in 1 day. I wanted to look back 1 day. Similarly 3600 seconds is 1 hour. And 604,800 seconds in 1 week
您可以将日功能更改为小时和周,如下所示,并在 window rangeBetween
def hour(x: Int) = x * 3600
def week(x: Int) = x * 604800
希望回答对你有帮助