如何对两个数据框中的列进行分组,然后在行之间应用聚合差异函数?
How to groupby columns from two dataframes and then apply aggregate difference function between rows?
我有两个数据框如下:
+--------+----------+------+-------------------+
|readerId|locationId|userId| timestamp|
+--------+----------+------+-------------------+
| R2| l1| u2|2018-04-12 05:00:00|
| R1| l1| u1|2018-04-12 05:00:00|
| R3| l3| u3|2018-04-12 05:00:00|
+--------+----------+------+-------------------+
+--------+----------+------+-------------------+
|readerId|locationId|userId| timestamp|
+--------+----------+------+-------------------+
| R1| l1| u1|2018-04-12 07:00:00|
| R2| l1| u2|2018-04-12 10:00:00|
| R3| l3| u3|2018-04-12 07:00:00|
+--------+----------+------+-------------------+
我想对 readerId
和 locationId
进行分组,然后找出分组值的时间戳差异。例如:readerIDR1
,locationIDl1
,时间戳相差2小时
我通过连接两个数据帧并使用 withColumn
.
实现了它
val joinedDf = asKuduDf.join(
asOutToInDf,
col("kdf.locationId") <=> col("outInDf.locationId") &&
(col("kdf.readerId") <=> col("outInDf.readerId")),
"inner")
//Time loged in calculation
val timestampDf = joinedDf.withColumn(
"totalTime",
((unix_timestamp($"outInDf.timestamp") -
unix_timestamp($"kdf.timestamp"))/60).cast("long")
).toDF()
有更好的方法吗?我也尝试了以下方法。
val unionDf = outToInDf.union(kuduDf)
val timeDiffDf = unionDf.groupBy($"readerId", $"locationId").agg(diff($"timestamp"))
但是上述方法的问题是没有'diff'功能。
join
是这里的正确解决方案。一般来说 GROUP BY
聚合不是一个选项,特别是如果 (readerId
, locationId
) 不是唯一标识符。
你可以
unionDf
.groupBy($"readerId", $"locationId")
.agg((max($"timestamp").cast("long") - min($"timestamp").cast(long) / 60).alias("diff"))
但这是高度人为的解决方案,与 join
相比没有任何优势。它还对一些细微的数据问题很敏感。
您可以将两个数据帧与 union
合并,在聚合中,您可以将差异计算为
val mergedDF = asKuduDf.union(asOutToInDf)
.groupBy($"readerId", $"locationId")
.agg(collect_list($"timestamp").as("time"))
mergedDF.withColumn("dif",
abs(unix_timestamp($"time" (0)) - unix_timestamp($"time" (1))) / 60
)
输出:
+--------+----------+------------------------------------------+-----+
|readerId|locationId|time |dif |
+--------+----------+------------------------------------------+-----+
|R3 |l3 |[2018-04-12 05:00:00, 2018-04-12 07:00:00]|120.0|
|R2 |l1 |[2018-04-12 05:00:00, 2018-04-12 10:00:00]|300.0|
|R1 |l1 |[2018-04-12 05:00:00, 2018-04-12 07:00:00]|120.0|
+--------+----------+------------------------------------------+-----+
希望对您有所帮助!
我有两个数据框如下:
+--------+----------+------+-------------------+
|readerId|locationId|userId| timestamp|
+--------+----------+------+-------------------+
| R2| l1| u2|2018-04-12 05:00:00|
| R1| l1| u1|2018-04-12 05:00:00|
| R3| l3| u3|2018-04-12 05:00:00|
+--------+----------+------+-------------------+
+--------+----------+------+-------------------+
|readerId|locationId|userId| timestamp|
+--------+----------+------+-------------------+
| R1| l1| u1|2018-04-12 07:00:00|
| R2| l1| u2|2018-04-12 10:00:00|
| R3| l3| u3|2018-04-12 07:00:00|
+--------+----------+------+-------------------+
我想对 readerId
和 locationId
进行分组,然后找出分组值的时间戳差异。例如:readerIDR1
,locationIDl1
,时间戳相差2小时
我通过连接两个数据帧并使用 withColumn
.
val joinedDf = asKuduDf.join(
asOutToInDf,
col("kdf.locationId") <=> col("outInDf.locationId") &&
(col("kdf.readerId") <=> col("outInDf.readerId")),
"inner")
//Time loged in calculation
val timestampDf = joinedDf.withColumn(
"totalTime",
((unix_timestamp($"outInDf.timestamp") -
unix_timestamp($"kdf.timestamp"))/60).cast("long")
).toDF()
有更好的方法吗?我也尝试了以下方法。
val unionDf = outToInDf.union(kuduDf)
val timeDiffDf = unionDf.groupBy($"readerId", $"locationId").agg(diff($"timestamp"))
但是上述方法的问题是没有'diff'功能。
join
是这里的正确解决方案。一般来说 GROUP BY
聚合不是一个选项,特别是如果 (readerId
, locationId
) 不是唯一标识符。
你可以
unionDf
.groupBy($"readerId", $"locationId")
.agg((max($"timestamp").cast("long") - min($"timestamp").cast(long) / 60).alias("diff"))
但这是高度人为的解决方案,与 join
相比没有任何优势。它还对一些细微的数据问题很敏感。
您可以将两个数据帧与 union
合并,在聚合中,您可以将差异计算为
val mergedDF = asKuduDf.union(asOutToInDf)
.groupBy($"readerId", $"locationId")
.agg(collect_list($"timestamp").as("time"))
mergedDF.withColumn("dif",
abs(unix_timestamp($"time" (0)) - unix_timestamp($"time" (1))) / 60
)
输出:
+--------+----------+------------------------------------------+-----+
|readerId|locationId|time |dif |
+--------+----------+------------------------------------------+-----+
|R3 |l3 |[2018-04-12 05:00:00, 2018-04-12 07:00:00]|120.0|
|R2 |l1 |[2018-04-12 05:00:00, 2018-04-12 10:00:00]|300.0|
|R1 |l1 |[2018-04-12 05:00:00, 2018-04-12 07:00:00]|120.0|
+--------+----------+------------------------------------------+-----+
希望对您有所帮助!