如何使用 Pyspark 在 Dataframe 中使用具有多列的平面图
How do I use flatmap with multiple columns in Dataframe using Pyspark
我有如下DF:
Name city starttime endtime
user1 London 2019-08-02 03:34:45 2019-08-02 03:52:03
user2 Boston 2019-08-13 13:34:10 2019-08-13 15:02:10
我想检查 endtime
,如果它进入下一个小时,则用当前小时的最后一个 minute/second 更新当前记录,并附加另一行或多行具有类似数据如下所示(用户 2)。我是使用 flapmap 还是将 DF 转换为 RDD 并使用 map 还是其他方式?
Name city starttime endtime
user1 London 2019-08-02 03:34:45 2019-08-02 03:52:03
user2 Boston 2019-08-13 13:34:10 2019-08-13 13:59:59
user2 Boston 2019-08-13 14:00:00 2019-08-13 14:59:59
user2 Boston 2019-08-13 15:00:00 2019-08-13 15:02:10
谢谢
>>> from pyspark.sql.functions import *
>>> df.show()
+-----+------+-------------------+-------------------+
| Name| city| starttime| endtime|
+-----+------+-------------------+-------------------+
|user1|London|2019-08-02 03:34:45|2019-08-02 03:52:03|
|user2|Boston|2019-08-13 13:34:10|2019-08-13 15:02:10|
+-----+------+-------------------+-------------------+
>>> df1 = df.withColumn("diff", ((hour(col("endtime")) - hour(col("starttime")))).cast("Int"))
.withColumn("loop", expr("split(repeat(':', diff),':')"))
.select(col("*"), posexplode(col("loop")).alias("pos", "value"))
.drop("value", "loop")
>>> df1.withColumn("starttime", when(col("pos") == 0, col("starttime")).otherwise(from_unixtime(unix_timestamp(col("starttime")) + (col("pos") * 3600) - minute(col("starttime"))*60 - second(col("starttime")))))
.withColumn("endtime", when((col("diff") - col("pos")) == 0, col("endtime")).otherwise(from_unixtime(unix_timestamp(col("endtime")) - ((col("diff") - col("pos")) * 3600) - minute(col("endtime"))*60 - second(col("endtime")) + lit(59) * lit(60) + lit(59))))
.drop("diff", "pos")
.show()
+-----+------+-------------------+-------------------+
| Name| city| starttime| endtime|
+-----+------+-------------------+-------------------+
|user1|London|2019-08-02 03:34:45|2019-08-02 03:52:03|
|user2|Boston|2019-08-13 13:34:10|2019-08-13 13:59:59|
|user2|Boston|2019-08-13 14:00:00|2019-08-13 14:59:59|
|user2|Boston|2019-08-13 15:00:00|2019-08-13 15:02:10|
+-----+------+-------------------+-------------------+
我有如下DF:
Name city starttime endtime
user1 London 2019-08-02 03:34:45 2019-08-02 03:52:03
user2 Boston 2019-08-13 13:34:10 2019-08-13 15:02:10
我想检查 endtime
,如果它进入下一个小时,则用当前小时的最后一个 minute/second 更新当前记录,并附加另一行或多行具有类似数据如下所示(用户 2)。我是使用 flapmap 还是将 DF 转换为 RDD 并使用 map 还是其他方式?
Name city starttime endtime
user1 London 2019-08-02 03:34:45 2019-08-02 03:52:03
user2 Boston 2019-08-13 13:34:10 2019-08-13 13:59:59
user2 Boston 2019-08-13 14:00:00 2019-08-13 14:59:59
user2 Boston 2019-08-13 15:00:00 2019-08-13 15:02:10
谢谢
>>> from pyspark.sql.functions import *
>>> df.show()
+-----+------+-------------------+-------------------+
| Name| city| starttime| endtime|
+-----+------+-------------------+-------------------+
|user1|London|2019-08-02 03:34:45|2019-08-02 03:52:03|
|user2|Boston|2019-08-13 13:34:10|2019-08-13 15:02:10|
+-----+------+-------------------+-------------------+
>>> df1 = df.withColumn("diff", ((hour(col("endtime")) - hour(col("starttime")))).cast("Int"))
.withColumn("loop", expr("split(repeat(':', diff),':')"))
.select(col("*"), posexplode(col("loop")).alias("pos", "value"))
.drop("value", "loop")
>>> df1.withColumn("starttime", when(col("pos") == 0, col("starttime")).otherwise(from_unixtime(unix_timestamp(col("starttime")) + (col("pos") * 3600) - minute(col("starttime"))*60 - second(col("starttime")))))
.withColumn("endtime", when((col("diff") - col("pos")) == 0, col("endtime")).otherwise(from_unixtime(unix_timestamp(col("endtime")) - ((col("diff") - col("pos")) * 3600) - minute(col("endtime"))*60 - second(col("endtime")) + lit(59) * lit(60) + lit(59))))
.drop("diff", "pos")
.show()
+-----+------+-------------------+-------------------+
| Name| city| starttime| endtime|
+-----+------+-------------------+-------------------+
|user1|London|2019-08-02 03:34:45|2019-08-02 03:52:03|
|user2|Boston|2019-08-13 13:34:10|2019-08-13 13:59:59|
|user2|Boston|2019-08-13 14:00:00|2019-08-13 14:59:59|
|user2|Boston|2019-08-13 15:00:00|2019-08-13 15:02:10|
+-----+------+-------------------+-------------------+