Problem/bug 在使用 SciPy 中的 find_peaks 绘制尖峰检测期间从 Spark 数据帧读取列表值
Problem/bug with list values reading from Spark dataframe during plotting spikes detection using find_peaks from SciPy
假设我有以下 pandas 数据帧随着时间的推移包含 value
或 date
:
import pandas as pd
pdf = pd.DataFrame(data={'date':['2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23','2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03','2020-11-04','2020-11-05','2020-11-06','2020-11-07','2020-11-08','2020-11-09','2020-11-10','2020-11-11','2020-11-12','2020-11-13','2020-11-14','2020-11-15'],
'value':[161967, 161270, 148508, 152442, 157504, 157118, 155674, 134522, 213384, 163242, 217415, 221502, 146267, 143621, 145875, 139488, 104466, 94825, 143686, 151952, 161074, 161417, 135042, 148768, 131428, 127816, 151905, 180498, 177899, 193950, 12]})
pdf
或者我有以下具有类似数据的 Spark 数据框:
import pyspark.sql.types
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType
dict = [ ('2020-10-16', 161967),
('2020-10-17', 161270),
('2020-10-18', 148508),
('2020-10-19', 152442),
('2020-10-20', 157504),
('2020-10-21', 157118),
('2020-10-22', 155674),
('2020-10-23', 134522),
('2020-10-24', 213384),
('2020-10-25', 163242),
('2020-10-26', 217415),
('2020-10-27', 221502),
('2020-10-28', 146267),
('2020-10-29', 143621),
('2020-10-30', 145875),
('2020-10-31', 139488),
('2020-11-01', 104466),
('2020-11-02', 94825),
('2020-11-03', 143686),
('2020-11-04', 151952),
('2020-11-05', 161074),
('2020-11-06', 161417),
('2020-11-07', 135042),
('2020-11-08', 148768),
('2020-11-09', 131428),
('2020-11-10', 127816),
('2020-11-11', 151905),
('2020-11-12', 180498),
('2020-11-13', 177899),
('2020-11-14', 193950),
('2020-11-15', 12),
]
schema = StructType([
StructField("date", StringType(), True), \
StructField("value", IntegerType(), True), \
])
#create a Spark dataframe
sc= SparkContext()
sqlContext = SQLContext(sc)
sdf = sqlContext.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.sort('date').show(truncate = False)
我受此启发 通过以下代码检测波峰和波谷:
from scipy.signal import find_peaks
import numpy as np
import matplotlib.pyplot as plt
# Input signal from Pandas dataframe
t = pdf.date
x = pdf.value
# Set thresholds
# std calculated on 10-90 percentile data, without outliers is used for threshold
thresh_top = np.median(x) + 1 * np.std(x)
thresh_bottom = np.median(x) - 1 * np.std(x)
# Find indices of peaks & of valleys (from inverting the signal)
peak_idx, _ = find_peaks(x, height = thresh_top)
valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
# Plot signal
plt.figure(figsize=(14,12))
plt.plot(t, x , color='b', label='data')
plt.scatter(t, x, s=10,c='b',label='value')
# Plot threshold
plt.plot([min(t), max(t)], [thresh_top, thresh_top], '--', color='r', label='peaks-threshold')
plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--', color='g', label='valleys-threshold')
# Plot peaks (red) and valleys (blue)
plt.plot(t[peak_idx], x[peak_idx], "x", color='r', label='peaks')
plt.plot(t[valley_idx], x[valley_idx], "x", color='g', label='valleys')
plt.xticks(rotation=45)
plt.ylabel('value')
plt.xlabel('timestamp')
plt.title(f'data over time')
plt.legend( loc='lower left')
plt.gcf().autofmt_xdate()
plt.show()
当我在创建 Spark 数据帧之前从 Pandas 数据帧 pdf
读取数据时,它可以正常工作并成功绘制。一旦我创建了 Spark 数据帧 sdf
,即使你 运行 笔记本上的同一个单元格在读取和来自 pandas pdf
的输入信号中不再工作并给出以下错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-12-1c79b34272c5> in <module>()
23
24 # Plot threshold
---> 25 plt.plot([min(t), max(t)], [thresh_top, thresh_top], '--', color='r', label='peaks-threshold')
26 plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--', color='g', label='valleys-threshold')
27
2 frames
/content/spark-3.1.2-bin-hadoop2.7/python/pyspark/sql/column.py in _to_java_column(col)
47 "{0} of type {1}. "
48 "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
---> 49 "function.".format(col, type(col)))
50 return jcol
51
TypeError: Invalid argument, not a string or column: 0 2020-10-16
1 2020-10-17
2 2020-10-18
3 2020-10-19
4 2020-10-20
5 2020-10-21
6 2020-10-22
7 2020-10-23
8 2020-10-24
9 2020-10-25
10 2020-10-26
11 2020-10-27
12 2020-10-28
13 2020-10-29
14 2020-10-30
15 2020-10-31
16 2020-11-01
17 2020-11-02
18 2020-11-03
19 2020-11-04
20 2020-11-05
21 2020-11-06
22 2020-11-07
23 2020-11-08
24 2020-11-09
25 2020-11-10
26 2020-11-11
27 2020-11-12
28 2020-11-13
29 2020-11-14
30 2020-11-15
Name: date, dtype: object of type <class 'pandas.core.series.Series'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
当我尝试直接使用以下方法从 spark 数据帧读取数据时:
# Input signal from Spark dataframe
t = [val.date for val in sdf.select('date').collect()]
x = [val.value for val in sdf.select('value').collect()]
遗憾的是绘图代码不起作用并抛出以下错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-14-9cfede77d0fb> in <module>()
31 # Find indices of peaks & of valleys (from inverting the signal)
32 peak_idx, _ = find_peaks(x, height = thresh_top)
---> 33 valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
34
35
TypeError: bad operand type for unary -: 'list'
我花了很多时间,但我无法修复这个错误。我也对其他非 find_peaks()
解决方案开放,以像这样的 numpythonic if I can adapt to the code and apply on Spark dataframe. I have tried many things you could check in this google Colab Notebook 绘制尖峰检测,并随时 run/test/edit 它进行快速调试。
问题是您没有使用相同的对象。
当您使用 pandas 并得到 x = pdf.value
时,您实际上得到了 Series
对象。这个对象可以把-
放在前面,它知道它必须把里面的值转换成负数。
但是当你使用 PySpark 并收集值时,你会得到 list
对象,如果你把 -
放在前面,你会得到错误:
TypeError: bad operand type for unary -: 'list'
这告诉你它不知道如何处理它。
- 所以第一件事要做,而不是:
valley_idx, _ = find_peaks(-x, height=-thresh_bottom)
您必须将值转换为负数,例如:
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
- 接下来,
find_peaks
将 return ndarray
再次不能与 list
一起使用:
plt.plot(t[peak_idx], x[peak_idx], "x", color="r", label="peaks")
因此您必须手动完成,例如:
plt.plot(
[t[i] for i in peak_idx],
[x[i] for i in peak_idx],
"x",
color="r",
label="peaks",
)
我已经使用以下代码重现了您的情节(+ 以 PySpark 中的 median
和 std_dev
为例):
# data is the same
# ...
# create a Spark dataframe
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(data=data, schema=schema)
std_dev = sdf.select(F.stddev(F.col("value")).alias("std")).collect()[0]["std"]
median = (
sdf.groupBy("value")
.agg(F.expr("percentile_approx(value, 0.5)").alias("med"))
.collect()[0]["med"]
)
thresh_top = median + 1 * std_dev
thresh_bottom = median - 1 * std_dev
t = sdf.select("date").rdd.flatMap(lambda x: x).collect()
x = sdf.select("value").rdd.flatMap(lambda x: x).collect()
peak_idx, _ = find_peaks(x, height=thresh_top)
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
plt.figure(figsize=(14, 12))
plt.plot(t, x, color="b", label="data")
plt.scatter(t, x, s=10, c="b", label="value")
# Plot threshold
plt.plot(
[min(t), max(t)],
[thresh_top, thresh_top],
"--",
color="r",
label="peaks-threshold",
)
plt.plot(
[min(t), max(t)],
[thresh_bottom, thresh_bottom],
"--",
color="g",
label="valleys-threshold",
)
# Plot peaks (red) and valleys (blue)
plt.plot(
[t[i] for i in peak_idx],
[x[i] for i in peak_idx],
"x",
color="r",
label="peaks",
)
plt.plot(
[t[i] for i in valley_idx],
[x[i] for i in valley_idx],
"x",
color="g",
label="valleys",
)
# ...
假设我有以下 pandas 数据帧随着时间的推移包含 value
或 date
:
import pandas as pd
pdf = pd.DataFrame(data={'date':['2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23','2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03','2020-11-04','2020-11-05','2020-11-06','2020-11-07','2020-11-08','2020-11-09','2020-11-10','2020-11-11','2020-11-12','2020-11-13','2020-11-14','2020-11-15'],
'value':[161967, 161270, 148508, 152442, 157504, 157118, 155674, 134522, 213384, 163242, 217415, 221502, 146267, 143621, 145875, 139488, 104466, 94825, 143686, 151952, 161074, 161417, 135042, 148768, 131428, 127816, 151905, 180498, 177899, 193950, 12]})
pdf
或者我有以下具有类似数据的 Spark 数据框:
import pyspark.sql.types
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType
dict = [ ('2020-10-16', 161967),
('2020-10-17', 161270),
('2020-10-18', 148508),
('2020-10-19', 152442),
('2020-10-20', 157504),
('2020-10-21', 157118),
('2020-10-22', 155674),
('2020-10-23', 134522),
('2020-10-24', 213384),
('2020-10-25', 163242),
('2020-10-26', 217415),
('2020-10-27', 221502),
('2020-10-28', 146267),
('2020-10-29', 143621),
('2020-10-30', 145875),
('2020-10-31', 139488),
('2020-11-01', 104466),
('2020-11-02', 94825),
('2020-11-03', 143686),
('2020-11-04', 151952),
('2020-11-05', 161074),
('2020-11-06', 161417),
('2020-11-07', 135042),
('2020-11-08', 148768),
('2020-11-09', 131428),
('2020-11-10', 127816),
('2020-11-11', 151905),
('2020-11-12', 180498),
('2020-11-13', 177899),
('2020-11-14', 193950),
('2020-11-15', 12),
]
schema = StructType([
StructField("date", StringType(), True), \
StructField("value", IntegerType(), True), \
])
#create a Spark dataframe
sc= SparkContext()
sqlContext = SQLContext(sc)
sdf = sqlContext.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.sort('date').show(truncate = False)
我受此启发
from scipy.signal import find_peaks
import numpy as np
import matplotlib.pyplot as plt
# Input signal from Pandas dataframe
t = pdf.date
x = pdf.value
# Set thresholds
# std calculated on 10-90 percentile data, without outliers is used for threshold
thresh_top = np.median(x) + 1 * np.std(x)
thresh_bottom = np.median(x) - 1 * np.std(x)
# Find indices of peaks & of valleys (from inverting the signal)
peak_idx, _ = find_peaks(x, height = thresh_top)
valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
# Plot signal
plt.figure(figsize=(14,12))
plt.plot(t, x , color='b', label='data')
plt.scatter(t, x, s=10,c='b',label='value')
# Plot threshold
plt.plot([min(t), max(t)], [thresh_top, thresh_top], '--', color='r', label='peaks-threshold')
plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--', color='g', label='valleys-threshold')
# Plot peaks (red) and valleys (blue)
plt.plot(t[peak_idx], x[peak_idx], "x", color='r', label='peaks')
plt.plot(t[valley_idx], x[valley_idx], "x", color='g', label='valleys')
plt.xticks(rotation=45)
plt.ylabel('value')
plt.xlabel('timestamp')
plt.title(f'data over time')
plt.legend( loc='lower left')
plt.gcf().autofmt_xdate()
plt.show()
当我在创建 Spark 数据帧之前从 Pandas 数据帧 pdf
读取数据时,它可以正常工作并成功绘制。一旦我创建了 Spark 数据帧 sdf
,即使你 运行 笔记本上的同一个单元格在读取和来自 pandas pdf
的输入信号中不再工作并给出以下错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-12-1c79b34272c5> in <module>()
23
24 # Plot threshold
---> 25 plt.plot([min(t), max(t)], [thresh_top, thresh_top], '--', color='r', label='peaks-threshold')
26 plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--', color='g', label='valleys-threshold')
27
2 frames
/content/spark-3.1.2-bin-hadoop2.7/python/pyspark/sql/column.py in _to_java_column(col)
47 "{0} of type {1}. "
48 "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
---> 49 "function.".format(col, type(col)))
50 return jcol
51
TypeError: Invalid argument, not a string or column: 0 2020-10-16
1 2020-10-17
2 2020-10-18
3 2020-10-19
4 2020-10-20
5 2020-10-21
6 2020-10-22
7 2020-10-23
8 2020-10-24
9 2020-10-25
10 2020-10-26
11 2020-10-27
12 2020-10-28
13 2020-10-29
14 2020-10-30
15 2020-10-31
16 2020-11-01
17 2020-11-02
18 2020-11-03
19 2020-11-04
20 2020-11-05
21 2020-11-06
22 2020-11-07
23 2020-11-08
24 2020-11-09
25 2020-11-10
26 2020-11-11
27 2020-11-12
28 2020-11-13
29 2020-11-14
30 2020-11-15
Name: date, dtype: object of type <class 'pandas.core.series.Series'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
当我尝试直接使用以下方法从 spark 数据帧读取数据时:
# Input signal from Spark dataframe
t = [val.date for val in sdf.select('date').collect()]
x = [val.value for val in sdf.select('value').collect()]
遗憾的是绘图代码不起作用并抛出以下错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-14-9cfede77d0fb> in <module>()
31 # Find indices of peaks & of valleys (from inverting the signal)
32 peak_idx, _ = find_peaks(x, height = thresh_top)
---> 33 valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
34
35
TypeError: bad operand type for unary -: 'list'
我花了很多时间,但我无法修复这个错误。我也对其他非 find_peaks()
解决方案开放,以像这样的 numpythonic
问题是您没有使用相同的对象。
当您使用 pandas 并得到 x = pdf.value
时,您实际上得到了 Series
对象。这个对象可以把-
放在前面,它知道它必须把里面的值转换成负数。
但是当你使用 PySpark 并收集值时,你会得到 list
对象,如果你把 -
放在前面,你会得到错误:
TypeError: bad operand type for unary -: 'list'
这告诉你它不知道如何处理它。
- 所以第一件事要做,而不是:
valley_idx, _ = find_peaks(-x, height=-thresh_bottom)
您必须将值转换为负数,例如:
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
- 接下来,
find_peaks
将 returnndarray
再次不能与list
一起使用:
plt.plot(t[peak_idx], x[peak_idx], "x", color="r", label="peaks")
因此您必须手动完成,例如:
plt.plot(
[t[i] for i in peak_idx],
[x[i] for i in peak_idx],
"x",
color="r",
label="peaks",
)
我已经使用以下代码重现了您的情节(+ 以 PySpark 中的 median
和 std_dev
为例):
# data is the same
# ...
# create a Spark dataframe
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(data=data, schema=schema)
std_dev = sdf.select(F.stddev(F.col("value")).alias("std")).collect()[0]["std"]
median = (
sdf.groupBy("value")
.agg(F.expr("percentile_approx(value, 0.5)").alias("med"))
.collect()[0]["med"]
)
thresh_top = median + 1 * std_dev
thresh_bottom = median - 1 * std_dev
t = sdf.select("date").rdd.flatMap(lambda x: x).collect()
x = sdf.select("value").rdd.flatMap(lambda x: x).collect()
peak_idx, _ = find_peaks(x, height=thresh_top)
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
plt.figure(figsize=(14, 12))
plt.plot(t, x, color="b", label="data")
plt.scatter(t, x, s=10, c="b", label="value")
# Plot threshold
plt.plot(
[min(t), max(t)],
[thresh_top, thresh_top],
"--",
color="r",
label="peaks-threshold",
)
plt.plot(
[min(t), max(t)],
[thresh_bottom, thresh_bottom],
"--",
color="g",
label="valleys-threshold",
)
# Plot peaks (red) and valleys (blue)
plt.plot(
[t[i] for i in peak_idx],
[x[i] for i in peak_idx],
"x",
color="r",
label="peaks",
)
plt.plot(
[t[i] for i in valley_idx],
[x[i] for i in valley_idx],
"x",
color="g",
label="valleys",
)
# ...