如何在 Pyspark 中通过滑动 window 随时间序列数据转换数据
How to transform data with sliding window over time series data in Pyspark
我正在尝试根据随时间序列数据滑动 window 来提取特征。
在 Scala 中,似乎有一个基于 this post and the documentation
的 sliding
函数
import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
我的问题是PySpark中有类似的功能吗?或者如果没有这样的功能,我们如何实现类似的滑动window转换?
据我所知,sliding
函数在 Python 中不可用,并且 SlidingRDD
是私有的 class,无法在 MLlib
外部访问.
如果您要在现有的 RDD 上使用 sliding
,您可以像这样创建穷人的 sliding
:
def sliding(rdd, n):
assert n > 0
def gen_window(xi, n):
x, i = xi
return [(i - offset, (i, x)) for offset in xrange(n)]
return (
rdd.
zipWithIndex(). # Add index
flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
groupByKey(). # Group to create windows
# Sort values to ensure order inside window and drop indices
mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
sortByKey(). # Sort to makes sure we keep original order
values(). # Get values
filter(lambda x: len(x) == n)) # Drop beginning and end
或者您可以尝试这样的操作(在 toolz
的帮助下)
from toolz.itertoolz import sliding_window, concat
def sliding2(rdd, n):
assert n > 1
def get_last_el(i, iter):
"""Return last n - 1 elements from the partition"""
return [(i, [x for x in iter][(-n + 1):])]
def slide(i, iter):
"""Prepend previous items and return sliding window"""
return sliding_window(n, concat([last_items.value[i - 1], iter]))
def clean_last_items(last_items):
"""Adjust for empty or to small partitions"""
clean = {-1: [None] * (n - 1)}
for i in range(rdd.getNumPartitions()):
clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
return {k: tuple(v) for k, v in clean.items()}
last_items = sc.broadcast(clean_last_items(
rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))
return rdd.mapPartitionsWithIndex(slide)
spark 1.4 具有 window 功能,如下所述:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
希望对您有所帮助,请告诉我。
要添加到 venuktan 的答案,这里是如何使用 Spark SQL 创建基于时间的滑动 window 并保留完整内容window,而不是对其进行汇总。在我将时间序列数据预处理为滑动 windows 以输入到 Spark ML 中的用例中,这是必需的。
这种方法的一个局限性是我们假设您希望随着时间的推移进行滑动 windows。
首先,您可以创建 Spark DataFrame,例如通过读取 CSV 文件:
df = spark.read.csv('foo.csv')
我们假设您的 CSV 文件有两列:一列是 unix 时间戳,另一列是您要从中提取滑动 windows 的列。
from pyspark.sql import functions as f
window_duration = '1000 millisecond'
slide_duration = '500 millisecond'
df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) \
.groupBy(f.window("_c0", window_duration, slide_duration)) \
.agg(f.collect_list(f.array('_c1'))) \
.withColumnRenamed('collect_list(array(_c1))', 'sliding_window')
奖励:将此数组列转换为 Spark ML 所需的 DenseVector 格式,see the UDF approach here。
额外奖励:取消嵌套结果列,这样滑动的每个元素 window 都有自己的列,。
希望对您有所帮助,如果我能澄清任何问题,请告诉我。
我正在尝试根据随时间序列数据滑动 window 来提取特征。 在 Scala 中,似乎有一个基于 this post and the documentation
的sliding
函数
import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
我的问题是PySpark中有类似的功能吗?或者如果没有这样的功能,我们如何实现类似的滑动window转换?
据我所知,sliding
函数在 Python 中不可用,并且 SlidingRDD
是私有的 class,无法在 MLlib
外部访问.
如果您要在现有的 RDD 上使用 sliding
,您可以像这样创建穷人的 sliding
:
def sliding(rdd, n):
assert n > 0
def gen_window(xi, n):
x, i = xi
return [(i - offset, (i, x)) for offset in xrange(n)]
return (
rdd.
zipWithIndex(). # Add index
flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
groupByKey(). # Group to create windows
# Sort values to ensure order inside window and drop indices
mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
sortByKey(). # Sort to makes sure we keep original order
values(). # Get values
filter(lambda x: len(x) == n)) # Drop beginning and end
或者您可以尝试这样的操作(在 toolz
的帮助下)
from toolz.itertoolz import sliding_window, concat
def sliding2(rdd, n):
assert n > 1
def get_last_el(i, iter):
"""Return last n - 1 elements from the partition"""
return [(i, [x for x in iter][(-n + 1):])]
def slide(i, iter):
"""Prepend previous items and return sliding window"""
return sliding_window(n, concat([last_items.value[i - 1], iter]))
def clean_last_items(last_items):
"""Adjust for empty or to small partitions"""
clean = {-1: [None] * (n - 1)}
for i in range(rdd.getNumPartitions()):
clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
return {k: tuple(v) for k, v in clean.items()}
last_items = sc.broadcast(clean_last_items(
rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))
return rdd.mapPartitionsWithIndex(slide)
spark 1.4 具有 window 功能,如下所述: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
希望对您有所帮助,请告诉我。
要添加到 venuktan 的答案,这里是如何使用 Spark SQL 创建基于时间的滑动 window 并保留完整内容window,而不是对其进行汇总。在我将时间序列数据预处理为滑动 windows 以输入到 Spark ML 中的用例中,这是必需的。
这种方法的一个局限性是我们假设您希望随着时间的推移进行滑动 windows。
首先,您可以创建 Spark DataFrame,例如通过读取 CSV 文件:
df = spark.read.csv('foo.csv')
我们假设您的 CSV 文件有两列:一列是 unix 时间戳,另一列是您要从中提取滑动 windows 的列。
from pyspark.sql import functions as f
window_duration = '1000 millisecond'
slide_duration = '500 millisecond'
df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) \
.groupBy(f.window("_c0", window_duration, slide_duration)) \
.agg(f.collect_list(f.array('_c1'))) \
.withColumnRenamed('collect_list(array(_c1))', 'sliding_window')
奖励:将此数组列转换为 Spark ML 所需的 DenseVector 格式,see the UDF approach here。
额外奖励:取消嵌套结果列,这样滑动的每个元素 window 都有自己的列,
希望对您有所帮助,如果我能澄清任何问题,请告诉我。