pyspark:查找具有条件的行之间的时间差异
pyspark: find time diffs between rows with conditions
您好,我正在尝试计算某些列中的值之间的差异总和(以毫秒为单位),这取决于另一列的值。
更多细节我有以下pyspark数据框:
d = spark.createDataFrame(
[(133515, "user1", 1562889600046, 'begin'),
(789456, "user2", 1562889600246, 'begin'),
(789456, "user2", 1562889603046, 'end'),
(712346, "user3", 1562889600046, 'begin'),
(789456, "user4", 1562889700046, 'begin'),
(133515, "user1", 1562889640046, 'end'),
(712346, "user3", 1562889602046, 'end'),
(789456, "user4", 1562889800046, 'end'),
(789456, "user4", 1562889850046, 'begin'),
(789456, "user4", 1562889903046, 'end'),
(133515, "user1", 1562889645046, 'begin'),
(133515, "user1", 1562889745046, 'end')
], ("ID", "user", "epoch", "ACTION"))
d.show()
我希望得到以下输出:
+------+-----+-----------+
| ID| user|summed diff|
+------+-----+-----------+
|133515|user1| 50000|
|789456|user2| 2800|
|712346|user3| 2000|
|789456|user4| 153000|
+------+-----+-----------+
summed diff
列中的每个值都是通过将 "end" 纪元与与该特定用户相关的最后一个 "begin" 之间的毫秒差相加而获得的。
你能指导我如何解决这个问题吗?
如果我想按天或一天中的小时分组怎么办?
试试这个:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
d_final = d.select(F.col("ID"), F.col("user"), F.when(F.col("ACTION") == lit("begin"), -F.col("epoch")).otherwise(F.col("epoch")).alias("epoch_temp")).groupBy(F.col("ID"), F.col("user")).agg(F.sum(F.col("epoch_temp")).alias("summed_diff"))
结果:
>>> d_final.show()
+------+-----+-----------+
| ID| user|summed_diff|
+------+-----+-----------+
|789456|user4| 153000|
|712346|user3| 2000|
|133515|user1| 140000|
|789456|user2| 2800|
+------+-----+-----------+
编辑 - 使用 udf 看起来更干净
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, udf
action_process = udf(lambda x: -1 if x=="begin" else 1, IntegerType())
d_final = d.select(F.col("ID"), F.col("user"), (action_process(F.col("ACTION")) * F.col("epoch")).alias("epoch_temp")).groupBy(F.col("ID"), F.col("user")).agg(F.sum(F.col("epoch_temp")).alias("summed_diff"))
结果:
>>> d_final.show()
+------+-----+-----------+
| ID| user|summed_diff|
+------+-----+-----------+
|789456|user4| 153000|
|712346|user3| 2000|
|133515|user1| 140000|
|789456|user2| 2800|
+------+-----+-----------+
您好,我正在尝试计算某些列中的值之间的差异总和(以毫秒为单位),这取决于另一列的值。
更多细节我有以下pyspark数据框:
d = spark.createDataFrame(
[(133515, "user1", 1562889600046, 'begin'),
(789456, "user2", 1562889600246, 'begin'),
(789456, "user2", 1562889603046, 'end'),
(712346, "user3", 1562889600046, 'begin'),
(789456, "user4", 1562889700046, 'begin'),
(133515, "user1", 1562889640046, 'end'),
(712346, "user3", 1562889602046, 'end'),
(789456, "user4", 1562889800046, 'end'),
(789456, "user4", 1562889850046, 'begin'),
(789456, "user4", 1562889903046, 'end'),
(133515, "user1", 1562889645046, 'begin'),
(133515, "user1", 1562889745046, 'end')
], ("ID", "user", "epoch", "ACTION"))
d.show()
我希望得到以下输出:
+------+-----+-----------+
| ID| user|summed diff|
+------+-----+-----------+
|133515|user1| 50000|
|789456|user2| 2800|
|712346|user3| 2000|
|789456|user4| 153000|
+------+-----+-----------+
summed diff
列中的每个值都是通过将 "end" 纪元与与该特定用户相关的最后一个 "begin" 之间的毫秒差相加而获得的。
你能指导我如何解决这个问题吗?
如果我想按天或一天中的小时分组怎么办?
试试这个:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
d_final = d.select(F.col("ID"), F.col("user"), F.when(F.col("ACTION") == lit("begin"), -F.col("epoch")).otherwise(F.col("epoch")).alias("epoch_temp")).groupBy(F.col("ID"), F.col("user")).agg(F.sum(F.col("epoch_temp")).alias("summed_diff"))
结果:
>>> d_final.show()
+------+-----+-----------+
| ID| user|summed_diff|
+------+-----+-----------+
|789456|user4| 153000|
|712346|user3| 2000|
|133515|user1| 140000|
|789456|user2| 2800|
+------+-----+-----------+
编辑 - 使用 udf 看起来更干净
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, udf
action_process = udf(lambda x: -1 if x=="begin" else 1, IntegerType())
d_final = d.select(F.col("ID"), F.col("user"), (action_process(F.col("ACTION")) * F.col("epoch")).alias("epoch_temp")).groupBy(F.col("ID"), F.col("user")).agg(F.sum(F.col("epoch_temp")).alias("summed_diff"))
结果:
>>> d_final.show()
+------+-----+-----------+
| ID| user|summed_diff|
+------+-----+-----------+
|789456|user4| 153000|
|712346|user3| 2000|
|133515|user1| 140000|
|789456|user2| 2800|
+------+-----+-----------+