如何每次window取一个记录子集?
How to take a subset of records per time window?
我有一个非常大的数据框(450000 行),其中包含传感器数据和时间戳,如下所示:
+--------+-----------+-----------+------------+-----------+
|Time [s]| Sensor1 | Sensor2 | Sensor3 | Sensor4 |
+--------+-----------+-----------+------------+-----------+
| 0.00198|-0.55999756|-0.19271851| 1.1320496| 1.373291|
| 0.00398| -1.2171936| 1.0081482| 0.25726318| 0.61035156|
| 0.00598|-0.29586792| 1.4437866| -1.1341858| 1.373291|
| 0.00798| 1.4489746| 0.39047241| -1.4620972|-0.30517578|
| 0.00998| 1.5341187| -1.1869812| -0.19256592|-0.15258789|
| 0.01198| 0.04196167| -1.2620544| 1.1372375| 0.45776367|
| 0.01398| -1.0899353| 0.19500732| 0.79772949| 1.8310547|
| 0.01598| -0.6300354| 0.77346802| -0.69030762| 0.61035156|
| 0.01798| 0.95153809| 0.40786743| -0.96694946| 0.0|
| 0.01998| 1.5705872|-0.75668335| 0.063323975| 0.91552734|
| 0.02198| 0.29678345| -1.4421082| 1.1439514| -1.0681152|
| 0.02398| -1.3595581|-0.25726318| 1.4170837| 0.45776367|
+--------+-----------+-----------+------------+-----------+
我需要在一段时间内将这些数据展平 window 并附加到列表中。
例如,如果 window 是 10 毫秒,那么我将从上面的每个传感器数据中取 5 个并添加到列表中,它看起来像这样:
[[-0.55999756, -0.19271851, 1.1320496, 1.373291, -1.2171936, 1.0081482, 0.25726318, 0.61035156, -0.29586792, 1.4437866, -1.1341858, 1.373291, 1.4489746, 0.39047241, -1.4620972, -0.30517578, 1.5341187, -1.1869812, -0.19256592, -0.15258789]
... ]
我目前正在使用以下代码实现此目的:
mylist=[]
df= df.withColumn("row", row_number().over(Window.orderBy(monotonically_increasing_id())))
for m in range(n+1, df.count()+n+1, n):
newdf= df.filter((col("row")>(m-n)) & (col("row")<m))
flatlist= newdf.select("Sensor1", "Sensor2", "Sensor3", "Sensor4").rdd.flatMap(lambda x: x).collect()
mylist.append(flatlist)
其中 m 和 n 是我 window 的边界。
这可行,但对于大型 window 和大型数据框,它需要永远(可能是因为 collect()?)。 是否有更有效的方法来获得相同的结果?
有了Pandas我可以做下面的事情,但是效率更高吗? (我宁愿用 Spark 来并行化)
pandasdf = df.toPandas()
flatlist=[pandasdf.values.flatten().tolist()]
tl;dr 使用 groupBy
运算符(可能使用 window
标准函数)后跟 collect_list
标准函数。
您可能希望使用用户定义的函数 (UDF) 从 collect_list
中取出前 5 个元素。
我没有使用 Python / pyspark,所以可以提供更多帮助。
这将 return 每 10 毫秒的数据 window。但是它将是数组数组。不确定您是否需要在单个阵列中使用它。
df = df.withColumn('sensorDataArr', F.array('Sensor1', 'Sensor2', 'Sensor3', 'Sensor4'))
df = df.withColumn('grpNum', F.floor(df.Time * 100))
df_g = df.groupBy('grpNum').agg(F.collect_list('sensorDataArr').alias('sensorData'))
我有一个非常大的数据框(450000 行),其中包含传感器数据和时间戳,如下所示:
+--------+-----------+-----------+------------+-----------+
|Time [s]| Sensor1 | Sensor2 | Sensor3 | Sensor4 |
+--------+-----------+-----------+------------+-----------+
| 0.00198|-0.55999756|-0.19271851| 1.1320496| 1.373291|
| 0.00398| -1.2171936| 1.0081482| 0.25726318| 0.61035156|
| 0.00598|-0.29586792| 1.4437866| -1.1341858| 1.373291|
| 0.00798| 1.4489746| 0.39047241| -1.4620972|-0.30517578|
| 0.00998| 1.5341187| -1.1869812| -0.19256592|-0.15258789|
| 0.01198| 0.04196167| -1.2620544| 1.1372375| 0.45776367|
| 0.01398| -1.0899353| 0.19500732| 0.79772949| 1.8310547|
| 0.01598| -0.6300354| 0.77346802| -0.69030762| 0.61035156|
| 0.01798| 0.95153809| 0.40786743| -0.96694946| 0.0|
| 0.01998| 1.5705872|-0.75668335| 0.063323975| 0.91552734|
| 0.02198| 0.29678345| -1.4421082| 1.1439514| -1.0681152|
| 0.02398| -1.3595581|-0.25726318| 1.4170837| 0.45776367|
+--------+-----------+-----------+------------+-----------+
我需要在一段时间内将这些数据展平 window 并附加到列表中。
例如,如果 window 是 10 毫秒,那么我将从上面的每个传感器数据中取 5 个并添加到列表中,它看起来像这样:
[[-0.55999756, -0.19271851, 1.1320496, 1.373291, -1.2171936, 1.0081482, 0.25726318, 0.61035156, -0.29586792, 1.4437866, -1.1341858, 1.373291, 1.4489746, 0.39047241, -1.4620972, -0.30517578, 1.5341187, -1.1869812, -0.19256592, -0.15258789]
... ]
我目前正在使用以下代码实现此目的:
mylist=[]
df= df.withColumn("row", row_number().over(Window.orderBy(monotonically_increasing_id())))
for m in range(n+1, df.count()+n+1, n):
newdf= df.filter((col("row")>(m-n)) & (col("row")<m))
flatlist= newdf.select("Sensor1", "Sensor2", "Sensor3", "Sensor4").rdd.flatMap(lambda x: x).collect()
mylist.append(flatlist)
其中 m 和 n 是我 window 的边界。
这可行,但对于大型 window 和大型数据框,它需要永远(可能是因为 collect()?)。 是否有更有效的方法来获得相同的结果?
有了Pandas我可以做下面的事情,但是效率更高吗? (我宁愿用 Spark 来并行化)
pandasdf = df.toPandas()
flatlist=[pandasdf.values.flatten().tolist()]
tl;dr 使用 groupBy
运算符(可能使用 window
标准函数)后跟 collect_list
标准函数。
您可能希望使用用户定义的函数 (UDF) 从 collect_list
中取出前 5 个元素。
我没有使用 Python / pyspark,所以可以提供更多帮助。
这将 return 每 10 毫秒的数据 window。但是它将是数组数组。不确定您是否需要在单个阵列中使用它。
df = df.withColumn('sensorDataArr', F.array('Sensor1', 'Sensor2', 'Sensor3', 'Sensor4'))
df = df.withColumn('grpNum', F.floor(df.Time * 100))
df_g = df.groupBy('grpNum').agg(F.collect_list('sensorDataArr').alias('sensorData'))