如何计算 spark 数据帧中一个列的差异?
how to compute diff for one col in spark dataframe?
+-------------------+
| Dev_time|
+-------------------+
|2015-09-18 05:00:20|
|2015-09-18 05:00:21|
|2015-09-18 05:00:22|
|2015-09-18 05:00:23|
|2015-09-18 05:00:24|
|2015-09-18 05:00:25|
|2015-09-18 05:00:26|
|2015-09-18 05:00:27|
|2015-09-18 05:00:37|
|2015-09-18 05:00:37|
|2015-09-18 05:00:37|
|2015-09-18 05:00:38|
|2015-09-18 05:00:39|
+-------------------+
对于 spark dataframe,我想计算日期时间的差异,就像 numpy.diff(array)
一般来说,使用 Spark DataFrames
没有有效的方法来实现这一点。更不用说诸如顺序之类的事情在分布式设置中变得非常棘手。理论上您可以使用 lag
函数,如下所示:
from pyspark.sql.functions import lag, col, unix_timestamp
from pyspark.sql.window import Window
dev_time = (unix_timestamp(col("dev_time")) * 1000).cast("timestamp")
df = sc.parallelize([
("2015-09-18 05:00:20", ), ("2015-09-18 05:00:21", ),
("2015-09-18 05:00:22", ), ("2015-09-18 05:00:23", ),
("2015-09-18 05:00:24", ), ("2015-09-18 05:00:25", ),
("2015-09-18 05:00:26", ), ("2015-09-18 05:00:27", ),
("2015-09-18 05:00:37", ), ("2015-09-18 05:00:37", ),
("2015-09-18 05:00:37", ), ("2015-09-18 05:00:38", ),
("2015-09-18 05:00:39", )
]).toDF(["dev_time"]).withColumn("dev_time", dev_time)
w = Window.orderBy("dev_time")
lag_dev_time = lag("dev_time").over(w).cast("integer")
diff = df.select((col("dev_time").cast("integer") - lag_dev_time).alias("diff"))
## diff.show()
## +----+
## |diff|
## +----+
## |null|
## | 1|
## | 1|
## | 1|
## | 1|
## | 1|
## | 1|
## | 1|
## | 10|
## ...
但效率极低(对于window函数,如果没有提供PARTITION BY
子句,则将所有数据移动到单个分区)。实际上,在 RDD (Scala) 上使用 sliding
方法或实现您自己的滑动 window (Python) 更有意义。参见:
- How to access Spark RDD Array of elements based on index
+-------------------+
| Dev_time|
+-------------------+
|2015-09-18 05:00:20|
|2015-09-18 05:00:21|
|2015-09-18 05:00:22|
|2015-09-18 05:00:23|
|2015-09-18 05:00:24|
|2015-09-18 05:00:25|
|2015-09-18 05:00:26|
|2015-09-18 05:00:27|
|2015-09-18 05:00:37|
|2015-09-18 05:00:37|
|2015-09-18 05:00:37|
|2015-09-18 05:00:38|
|2015-09-18 05:00:39|
+-------------------+
对于 spark dataframe,我想计算日期时间的差异,就像 numpy.diff(array)
一般来说,使用 Spark DataFrames
没有有效的方法来实现这一点。更不用说诸如顺序之类的事情在分布式设置中变得非常棘手。理论上您可以使用 lag
函数,如下所示:
from pyspark.sql.functions import lag, col, unix_timestamp
from pyspark.sql.window import Window
dev_time = (unix_timestamp(col("dev_time")) * 1000).cast("timestamp")
df = sc.parallelize([
("2015-09-18 05:00:20", ), ("2015-09-18 05:00:21", ),
("2015-09-18 05:00:22", ), ("2015-09-18 05:00:23", ),
("2015-09-18 05:00:24", ), ("2015-09-18 05:00:25", ),
("2015-09-18 05:00:26", ), ("2015-09-18 05:00:27", ),
("2015-09-18 05:00:37", ), ("2015-09-18 05:00:37", ),
("2015-09-18 05:00:37", ), ("2015-09-18 05:00:38", ),
("2015-09-18 05:00:39", )
]).toDF(["dev_time"]).withColumn("dev_time", dev_time)
w = Window.orderBy("dev_time")
lag_dev_time = lag("dev_time").over(w).cast("integer")
diff = df.select((col("dev_time").cast("integer") - lag_dev_time).alias("diff"))
## diff.show()
## +----+
## |diff|
## +----+
## |null|
## | 1|
## | 1|
## | 1|
## | 1|
## | 1|
## | 1|
## | 1|
## | 10|
## ...
但效率极低(对于window函数,如果没有提供PARTITION BY
子句,则将所有数据移动到单个分区)。实际上,在 RDD (Scala) 上使用 sliding
方法或实现您自己的滑动 window (Python) 更有意义。参见:
- How to access Spark RDD Array of elements based on index