Scala 中的时间序列插值
Time series interpolation in Scala
我需要在 Scala 中插入时间序列
原始数据为
2020-08-01, value1
2020-08-03, value3
我想像这样在中间日期内插数据
2020-08-01, value1
2020-08-02, value2
2020-08-03, value3
其中 value2 是 value1 和 value3
的线性插值
有人可以帮我提供一个在 Scala Spark 中执行此操作的示例代码吗?由于性能原因,我宁愿避免使用 UDF 并使用 spark.range,但我愿意接受您的最佳解决方案。
谢谢!
0. 您可以分组并从数据框中获取最小、最大日期并制作一个序列,分解它以获得一系列日期。
from pyspark.sql.functions import *
from pyspark.sql import Window
w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
df.groupBy().agg(min('date').alias('date_min'), max('date').alias('date_max')) \
.withColumn('date', sequence(to_date('date_min'), to_date('date_max'))) \
.withColumn('date', explode('date')) \
.select('date') \
.join(df, ['date'], 'left') \
.show(10, False)
+----------+-----+
|date |value|
+----------+-----+
|2020-08-01|0 |
|2020-08-02|null |
|2020-08-03|null |
|2020-08-04|null |
|2020-08-05|null |
|2020-08-06|10 |
+----------+-----+
1.只针对你的情况,而且是最简单的。
from pyspark.sql.functions import *
from pyspark.sql import Window
w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
df.withColumn("value_m1", last('value', ignorenulls=True).over(w1)) \
.withColumn("value_p1", first('value', ignorenulls=True).over(w2)) \
.withColumn('value', coalesce(col('value'), expr('value_m1 + value_p1 / 2'))) \
.show(10, False)
+----------+-----+--------+--------+
|date |value|value_m1|value_p1|
+----------+-----+--------+--------+
|2020-08-01|0.0 |0 |0 |
|2020-08-02|5.0 |0 |10 |
|2020-08-03|10.0 |10 |10 |
+----------+-----+--------+--------+
2. 任意 null
天后略有改善。例如,当数据帧由此给出时,
+----------+-----+
|date |value|
+----------+-----+
|2020-08-01|0 |
|2020-08-02|null |
|2020-08-03|null |
|2020-08-04|null |
|2020-08-05|null |
|2020-08-06|10 |
|2020-08-07|null |
|2020-08-08|null |
+----------+-----+
那么代码应该改成如下:
from pyspark.sql.functions import *
from pyspark.sql import Window
w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
w3 = Window.partitionBy('days_m1').orderBy('date')
w4 = Window.partitionBy('days_p1').orderBy(desc('date'))
df.withColumn("value_m1", last('value', ignorenulls=True).over(w1)) \
.withColumn("value_p1", first('value', ignorenulls=True).over(w2)) \
.withColumn('days_m1', count(when(col('value').isNotNull(), 1)).over(w1)) \
.withColumn('days_p1', count(when(col('value').isNotNull(), 1)).over(w2)) \
.withColumn('days_m1', count(lit(1)).over(w3) - 1) \
.withColumn('days_p1', count(lit(1)).over(w4) - 1) \
.withColumn('value', coalesce(col('value'), expr('(days_p1 * value_m1 + days_m1 * value_p1) / (days_m1 + days_p1)'))) \
.orderBy('date') \
.show(10, False)
+----------+-----+--------+--------+-------+-------+
|date |value|value_m1|value_p1|days_m1|days_p1|
+----------+-----+--------+--------+-------+-------+
|2020-08-01|0.0 |0 |0 |0 |0 |
|2020-08-02|2.0 |0 |10 |1 |4 |
|2020-08-03|4.0 |0 |10 |2 |3 |
|2020-08-04|6.0 |0 |10 |3 |2 |
|2020-08-05|8.0 |0 |10 |4 |1 |
|2020-08-06|10.0 |10 |10 |0 |0 |
|2020-08-07|null |10 |null |1 |1 |
|2020-08-08|null |10 |null |2 |0 |
+----------+-----+--------+--------+-------+-------+
我需要在 Scala 中插入时间序列
原始数据为
2020-08-01, value1
2020-08-03, value3
我想像这样在中间日期内插数据
2020-08-01, value1
2020-08-02, value2
2020-08-03, value3
其中 value2 是 value1 和 value3
有人可以帮我提供一个在 Scala Spark 中执行此操作的示例代码吗?由于性能原因,我宁愿避免使用 UDF 并使用 spark.range,但我愿意接受您的最佳解决方案。
谢谢!
0. 您可以分组并从数据框中获取最小、最大日期并制作一个序列,分解它以获得一系列日期。
from pyspark.sql.functions import *
from pyspark.sql import Window
w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
df.groupBy().agg(min('date').alias('date_min'), max('date').alias('date_max')) \
.withColumn('date', sequence(to_date('date_min'), to_date('date_max'))) \
.withColumn('date', explode('date')) \
.select('date') \
.join(df, ['date'], 'left') \
.show(10, False)
+----------+-----+
|date |value|
+----------+-----+
|2020-08-01|0 |
|2020-08-02|null |
|2020-08-03|null |
|2020-08-04|null |
|2020-08-05|null |
|2020-08-06|10 |
+----------+-----+
1.只针对你的情况,而且是最简单的。
from pyspark.sql.functions import *
from pyspark.sql import Window
w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
df.withColumn("value_m1", last('value', ignorenulls=True).over(w1)) \
.withColumn("value_p1", first('value', ignorenulls=True).over(w2)) \
.withColumn('value', coalesce(col('value'), expr('value_m1 + value_p1 / 2'))) \
.show(10, False)
+----------+-----+--------+--------+
|date |value|value_m1|value_p1|
+----------+-----+--------+--------+
|2020-08-01|0.0 |0 |0 |
|2020-08-02|5.0 |0 |10 |
|2020-08-03|10.0 |10 |10 |
+----------+-----+--------+--------+
2. 任意 null
天后略有改善。例如,当数据帧由此给出时,
+----------+-----+
|date |value|
+----------+-----+
|2020-08-01|0 |
|2020-08-02|null |
|2020-08-03|null |
|2020-08-04|null |
|2020-08-05|null |
|2020-08-06|10 |
|2020-08-07|null |
|2020-08-08|null |
+----------+-----+
那么代码应该改成如下:
from pyspark.sql.functions import *
from pyspark.sql import Window
w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
w3 = Window.partitionBy('days_m1').orderBy('date')
w4 = Window.partitionBy('days_p1').orderBy(desc('date'))
df.withColumn("value_m1", last('value', ignorenulls=True).over(w1)) \
.withColumn("value_p1", first('value', ignorenulls=True).over(w2)) \
.withColumn('days_m1', count(when(col('value').isNotNull(), 1)).over(w1)) \
.withColumn('days_p1', count(when(col('value').isNotNull(), 1)).over(w2)) \
.withColumn('days_m1', count(lit(1)).over(w3) - 1) \
.withColumn('days_p1', count(lit(1)).over(w4) - 1) \
.withColumn('value', coalesce(col('value'), expr('(days_p1 * value_m1 + days_m1 * value_p1) / (days_m1 + days_p1)'))) \
.orderBy('date') \
.show(10, False)
+----------+-----+--------+--------+-------+-------+
|date |value|value_m1|value_p1|days_m1|days_p1|
+----------+-----+--------+--------+-------+-------+
|2020-08-01|0.0 |0 |0 |0 |0 |
|2020-08-02|2.0 |0 |10 |1 |4 |
|2020-08-03|4.0 |0 |10 |2 |3 |
|2020-08-04|6.0 |0 |10 |3 |2 |
|2020-08-05|8.0 |0 |10 |4 |1 |
|2020-08-06|10.0 |10 |10 |0 |0 |
|2020-08-07|null |10 |null |1 |1 |
|2020-08-08|null |10 |null |2 |0 |
+----------+-----+--------+--------+-------+-------+