如何根据 user_id 分区的其他列值计算行的差异

How to calculate the difference of rows based on the other column value partitioned by user_id

我有 JSON 原始数据,如下所示

{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:46","user_id" : 992210}
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 823323}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 823323}
{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:57","user_id" : 978699}

我为上面的原始数据创建了数据框

val rawDataFrame = sparkSession.read.option("multiline", "true").json(cleanJsonLines)

我需要找出每个用户登录我们系统的秒数。 预期的最终结果如下

{"user_id": 978699, "logged_in_sec":8} // (2019-11-20 00:14:47 - 2019-11-20 00:14:46) + (2019-11-20 00:14:57 - 2019-11-20 00:14:50)
{"user_id": 992210, "logged_in_sec":0}
{"user_id": 823323, "logged_in_sec":1}

我是 spark 和 scala 的新手,不知道如何在这里使用 window 函数来解决这个问题。

我不想编写一种程序代码来迭代数据帧的每一行并计算每个 user_id 之前 "login" 和当前 "logout" 事件之间的差异。

请指导我解决这个问题的方法。 感谢阅读。

更新:

原始数据中有一个陷阱。登录和注销不一定总是连续的。我的意思是说每个 user_id 事件也可以按以下顺序排列。多次注销后可能会发生另一个注销,类似地,多次登录事件后可能会发生登录。

{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:46","user_id" : 992210}
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 823323}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 823323}
{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:55","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:56","user_id" : 978699}    
{"event" : "logout","time" : "2019-11-20 00:14:57","user_id" : 978699}

因此,对于请求的聚合。如果我得到任何一组连续的 "logout" 事件,我必须在第一个 "logout" 事件之前从这组登录事件中获取最后一次登录并创建一个 window.

login 
login
login <- window start
logout <- window end
logout
login <- window start
logout <- window end
login <- window start
logout <- window end
logout

更新 2

所以按照上述 user_id“978699”windows 的方法将如下所示

window 1

{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}

window 2

{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:55","user_id" : 978699}

登录时间总和为 (47-46) + (55-50) = 6 秒

新方法

我添加了loginlogout事件的索引,猜测logout事件的个数不能超过login个事件的个数。通过使用这个假设,我做到了。

import org.apache.spark.sql.expressions.Window
val idPartition = Window.partitionBy("user_id").orderBy("time")

val df2 = df.withColumn("login_index", sum(when($"event" === "login", 1)).over(idPartition))
            .withColumn("logout_index", sum(when($"event" === "logout", 1)).over(idPartition))

df2.show(false)



val login = df2.where($"event" === "login")
                .withColumnRenamed("time", "login_time")
                .drop("logout_index")

login.show(false)

val logout = df2.where($"event" === "logout")
                .withColumnRenamed("time", "logout_time")
                .drop("login_index")

logout.show(false)

val finaldf = login.as("a").join(logout.as("b"), $"a.login_index" === $"b.logout_index"  && $"a.user_id" === $"b.user_id", "inner")
                .withColumn("session_time", unix_timestamp($"b.logout_time") - unix_timestamp($"a.login_time"))
                .select("a.login_time", "b.logout_time", "a.user_id", "a.login_index", "b.logout_index", "session_time")

finaldf.show(false)

val result = finaldf.groupBy("user_id").agg(sum("session_time") as "logged_in_sec")

result.show(false)

想法有点简单,但代码却不简单。首先,为 loginlogout 事件取一个索引并检查索引的最大值。然后,您注意到某些用户有更多 logout 事件,无法与 login 事件匹配。

结果依次为

+------+-------------------+-------+-----------+------------+
|event |time               |user_id|login_index|logout_index|
+------+-------------------+-------+-----------+------------+
|logout|2019-11-20 00:14:46|992210 |null       |1           |
|login |2019-11-20 00:14:46|978699 |1          |null        |
|logout|2019-11-20 00:14:47|978699 |1          |1           |
|login |2019-11-20 00:14:50|978699 |2          |1           |
|logout|2019-11-20 00:14:55|978699 |2          |2           |
|logout|2019-11-20 00:14:56|978699 |2          |3           |
|logout|2019-11-20 00:14:57|978699 |2          |4           |
|login |2019-11-20 00:14:46|823323 |1          |null        |
|logout|2019-11-20 00:14:47|823323 |1          |1           |
+------+-------------------+-------+-----------+------------+

+-----+-------------------+-------+-----------+
|event|login_time         |user_id|login_index|
+-----+-------------------+-------+-----------+
|login|2019-11-20 00:14:46|978699 |1          |
|login|2019-11-20 00:14:50|978699 |2          |
|login|2019-11-20 00:14:46|823323 |1          |
+-----+-------------------+-------+-----------+

+------+-------------------+-------+------------+
|event |logout_time        |user_id|logout_index|
+------+-------------------+-------+------------+
|logout|2019-11-20 00:14:46|992210 |1           |
|logout|2019-11-20 00:14:47|978699 |1           |
|logout|2019-11-20 00:14:55|978699 |2           |
|logout|2019-11-20 00:14:56|978699 |3           |
|logout|2019-11-20 00:14:57|978699 |4           |
|logout|2019-11-20 00:14:47|823323 |1           |
+------+-------------------+-------+------------+

+-------------------+-------------------+-------+-----------+------------+------------+
|login_time         |logout_time        |user_id|login_index|logout_index|session_time|
+-------------------+-------------------+-------+-----------+------------+------------+
|2019-11-20 00:14:46|2019-11-20 00:14:47|978699 |1          |1           |1           |
|2019-11-20 00:14:50|2019-11-20 00:14:55|978699 |2          |2           |5           |
|2019-11-20 00:14:46|2019-11-20 00:14:47|823323 |1          |1           |1           |
+-------------------+-------------------+-------+-----------+------------+------------+

+-------+-------------+
|user_id|logged_in_sec|
+-------+-------------+
|978699 |6            |
|823323 |1            |
+-------+-------------+

其中最后一个输出是最终的。

给你,

**source extraction**
val rawDataFrame = spark.read.format("json").load("../cleanjsonLines.json")
rawDataFrame.printSchema

root
 |-- event: string (nullable = true)
 |-- time: string (nullable = true)
 |-- user_id: long (nullable = true)

**casting to timestamp**
val tfDataFrame = rawDataFrame.selectExpr("event","to_timestamp(time) as time","user_id")
tfDataFrame.printSchema

root
 |-- event: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- user_id: long (nullable = true)

**creating temp view**
tfDataFrame.createOrReplaceTempView("SysEvent")

**Creating widowed temp view for each valid sessions**
spark.sql("select * from (select *,lag(event,-1) over (partition by user_id  order by time) as next_event, lag(time,-1) over (partition by user_id order by time) as next_time from SysEvent) a where event = 'login' and next_event = 'logout' order by user_id,time").createOrReplaceTempView("WindowSysEvent")
spark.sql("select * from WindowSysEvent").show()

Result for source dataset:    
+-----+-------------------+-------+----------+-------------------+
|event|               time|user_id|next_event|          next_time|
+-----+-------------------+-------+----------+-------------------+
|login|2019-11-20 00:14:46| 823323|    logout|2019-11-20 00:14:47|
|login|2019-11-20 00:14:46| 978699|    logout|2019-11-20 00:14:47|
|login|2019-11-20 00:14:50| 978699|    logout|2019-11-20 00:14:57|
+-----+-------------------+-------+----------+-------------------+

Result for updated dataset:
+-----+-------------------+-------+----------+-------------------+
|event|               time|user_id|next_event|          next_time|
+-----+-------------------+-------+----------+-------------------+
|login|2019-11-20 00:14:46| 823323|    logout|2019-11-20 00:14:47|
|login|2019-11-20 00:14:46| 978699|    logout|2019-11-20 00:14:47|
|login|2019-11-20 00:14:50| 978699|    logout|2019-11-20 00:14:55|
+-----+-------------------+-------+----------+-------------------+

**aggregation for valid sessions**
val result = spark.sql("select user_id, sum(unix_timestamp(next_time) - unix_timestamp(time)) as logged_in_sec from windowSysEvent group by user_id")
result.show()

Result for source dataset:
+-------+-------------+
|user_id|logged_in_sec|
+-------+-------------+
| 978699|            8|
| 823323|            1|
+-------+-------------+

Result for updated dataset:
+-------+-------------+
|user_id|logged_in_sec|
+-------+-------------+
| 978699|            6|
| 823323|            1|
+-------+-------------+

**write to target**
result.coalesce(1).write.format("json").save("../result.json")

Result for source dataset:
{"user_id":823323,"logged_in_sec":1}
{"user_id":978699,"logged_in_sec":8}

Result updated dataset:
{"user_id":823323,"logged_in_sec":1}
{"user_id":978699,"logged_in_sec":6}

result.explain
== Physical Plan ==
*(5) HashAggregate(keys=[user_id#184L], functions=[sum((unix_timestamp(next_time#898, yyyy-MM-dd HH:mm:ss, Some(..)) - unix_timestamp(time#188, yyyy-MM-dd HH:mm:ss, Some(..))))])
+- Exchange hashpartitioning(user_id#184L, 200)
   +- *(4) HashAggregate(keys=[user_id#184L], functions=[partial_sum((unix_timestamp(next_time#898, yyyy-MM-dd HH:mm:ss, Some(..)) - unix_timestamp(time#188, yyyy-MM-dd HH:mm:ss, Some(..))))])
      +- *(4) Sort [user_id#184L ASC NULLS FIRST, time#188 ASC NULLS FIRST], true, 0
         +- Exchange rangepartitioning(user_id#184L ASC NULLS FIRST, time#188 ASC NULLS FIRST, 200)
            +- *(3) Project [time#188, user_id#184L, next_time#898]
               +- *(3) Filter (((isnotnull(event#182) && isnotnull(next_event#897)) && (event#182 = login)) && (next_event#897 = logout))
                  +- Window [lag(event#182, -1, null) windowspecdefinition(user_id#184L, time#188 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS next_event#897, lag(time#188, -1, null) windowspecdefinition(user_id#184L, time#188 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS next_time#898], [user_id#184L], [time#188 ASC NULLS FIRST]
                     +- *(2) Sort [user_id#184L ASC NULLS FIRST, time#188 ASC NULLS FIRST], false, 0
                        +- Exchange hashpartitioning(user_id#184L, 200)
                           +- *(1) Project [event#182, cast(time#183 as timestamp) AS time#188, user_id#184L]
                              +- *(1) FileScan json [event#182,time#183,user_id#184L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:../cleanjsonLines.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<event:string,time:string,user_id:bigint>

原始数据

{"user_id":346214,"event":"logout","time":"2019-11-20 00:19:41"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:19:43"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:22:09"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:22:12"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:24:12"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:24:14"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:25:43"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:25:45"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:29:55"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:29:57"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:30:00"}


//create dataframe with only login events sorted by user_id, time
val leftDF = rawDataFrame.filter(col("event")===lit("login")).orderBy("user_id","time")
leftDF.show()

//create dataframe with only logout events sorted by user_id, time
val rightDF = rawDataFrame.filter(col("event")===lit("logout")).orderBy("user_id","time")
rightDF.show()

// join left and right dataframe such that logoutDF row time is greater that loginDF row time.
val joinedDF = leftDF.as("loginDF")
  .join(rightDF.as("logoutDF"),
    col("logoutDF.time") >= col("loginDF.time")
      &&
      col("loginDF.user_id") === col("logoutDF.user_id"),"left")
  .orderBy("loginDF.user_id","loginDF.time","logoutDF.time")
  .groupBy(col("loginDF.user_id").as("user_id"),col("loginDF.time").as("login"))
  .agg(first("logoutDF.time").as("logout"))
  .orderBy("user_id","login","logout")
// this will create data like below, now we have to remove the overlap from below data

{"user_id":346214,"login":"2019-11-20 00:25:45","logout":"2019-11-20 00:29:55","group_id":4,"updated_login":"2019-11-20 00:25:45","update_logout":"2019-11-20 00:29:55","session_time":250}
{"user_id":346214,"login":"2019-11-20 00:24:14","logout":"2019-11-20 00:25:43","group_id":3,"updated_login":"2019-11-20 00:24:14","update_logout":"2019-11-20 00:25:43","session_time":89}
{"user_id":346214,"login":"2019-11-20 00:29:57","logout":"2019-11-20 00:30:00","group_id":5,"updated_login":"2019-11-20 00:29:57","update_logout":"2019-11-20 00:30:00","session_time":3}
{"user_id":346214,"login":"2019-11-20 00:22:12","logout":"2019-11-20 00:24:12","group_id":2,"updated_login":"2019-11-20 00:22:12","update_logout":"2019-11-20 00:24:12","session_time":120}
{"user_id":346214,"login":"2019-11-20 00:19:43","logout":"2019-11-20 00:22:09","group_id":1,"updated_login":"2019-11-20 00:19:43","update_logout":"2019-11-20 00:22:09","session_time":146}

// to remove the overlap, I followed this post


val win1 = Window.partitionBy(col("user_id")).orderBy(col("login"), col("logout"))
val win2 = Window.partitionBy(col("user_id"), col("group_id"))
val finalDF = joinedDF.
  withColumn("group_id", when(
    col("login").between(lag(col("login"), 1).over(win1), lag(col("logout"), 1).over(win1)), null
  ).otherwise(monotonically_increasing_id)
  ).
  withColumn("group_id", last(col("group_id"), ignoreNulls=true).
    over(win1.rowsBetween(Window.unboundedPreceding, 0))
  ).
  withColumn("updated_login", min(col("login")).over(win2)).
  withColumn("update_logout", max(col("logout")).over(win2)).
  orderBy("user_id", "login", "logout")
  .dropDuplicates(Seq("user_id","updated_login", "updated_logout"))
  .withColumn("session_time", unix_timestamp(col("updated_logout")) - unix_timestamp(col("updated_login")))

//this will generate below data
{"user_id":346214,"logged_in_min":10.133333333333333}
val result = finalDF.groupBy("user_id").agg((sum("session_time")/60) as "logged_in_min").filter(col("logged_in_min").isNotNull)
result.coalesce(1).write.format("json").mode(SaveMode.Overwrite).save("../final_result.json")