Problem/bug 在使用 SciPy 中的 find_peaks 绘制尖峰检测期间从 Spark 数据帧读取列表值

Problem/bug with list values reading from Spark dataframe during plotting spikes detection using find_peaks from SciPy

假设我有以下 pandas 数据帧随着时间的推移包含 valuedate:

import pandas as pd

pdf = pd.DataFrame(data={'date':['2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23','2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03','2020-11-04','2020-11-05','2020-11-06','2020-11-07','2020-11-08','2020-11-09','2020-11-10','2020-11-11','2020-11-12','2020-11-13','2020-11-14','2020-11-15'],
                        'value':[161967, 161270, 148508, 152442, 157504, 157118, 155674, 134522, 213384, 163242, 217415, 221502, 146267, 143621, 145875, 139488, 104466, 94825, 143686, 151952, 161074, 161417, 135042, 148768, 131428, 127816, 151905, 180498, 177899, 193950, 12]})
pdf

或者我有以下具有类似数据的 Spark 数据框:

import pyspark.sql.types
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType

dict  = [ ('2020-10-16', 161967),
          ('2020-10-17', 161270),
          ('2020-10-18', 148508),
          ('2020-10-19', 152442),
          ('2020-10-20', 157504),
          ('2020-10-21', 157118),
          ('2020-10-22', 155674),
          ('2020-10-23', 134522),
          ('2020-10-24', 213384),
          ('2020-10-25', 163242),
          ('2020-10-26', 217415),
          ('2020-10-27', 221502),
          ('2020-10-28', 146267),
          ('2020-10-29', 143621),
          ('2020-10-30', 145875),
          ('2020-10-31', 139488),
          ('2020-11-01', 104466),
          ('2020-11-02', 94825),
          ('2020-11-03', 143686),
          ('2020-11-04', 151952),
          ('2020-11-05', 161074),
          ('2020-11-06', 161417),
          ('2020-11-07', 135042),
          ('2020-11-08', 148768),
          ('2020-11-09', 131428),
          ('2020-11-10', 127816),
          ('2020-11-11', 151905),
          ('2020-11-12', 180498),
          ('2020-11-13', 177899),
          ('2020-11-14', 193950),
          ('2020-11-15', 12),

  ]

schema = StructType([ 
    StructField("date",        StringType(),    True), \
    StructField("value",       IntegerType(),   True), \
  ])
 
#create a Spark dataframe
sc= SparkContext()
sqlContext = SQLContext(sc)
sdf = sqlContext.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.sort('date').show(truncate = False)

我受此启发 通过以下代码检测波峰和波谷:

from scipy.signal import find_peaks
import numpy as np
import matplotlib.pyplot as plt

# Input signal from Pandas dataframe
t = pdf.date
x = pdf.value

# Set thresholds
# std calculated on 10-90 percentile data, without outliers is used for threshold
thresh_top    = np.median(x) + 1 * np.std(x)
thresh_bottom = np.median(x) - 1 * np.std(x)


# Find indices of peaks & of valleys (from inverting the signal)
peak_idx, _   = find_peaks(x,  height =  thresh_top)
valley_idx, _ = find_peaks(-x, height = -thresh_bottom)

# Plot signal
plt.figure(figsize=(14,12))
plt.plot(t, x   , color='b', label='data')
plt.scatter(t, x, s=10,c='b',label='value')

# Plot threshold
plt.plot([min(t), max(t)], [thresh_top, thresh_top],       '--',  color='r', label='peaks-threshold')
plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--',  color='g', label='valleys-threshold')

# Plot peaks (red) and valleys (blue)
plt.plot(t[peak_idx],   x[peak_idx],   "x", color='r', label='peaks')
plt.plot(t[valley_idx], x[valley_idx], "x", color='g', label='valleys')


plt.xticks(rotation=45)
plt.ylabel('value')
plt.xlabel('timestamp')
plt.title(f'data over time')
plt.legend( loc='lower left')
plt.gcf().autofmt_xdate()
plt.show()

当我在创建 Spark 数据帧之前从 Pandas 数据帧 pdf 读取数据时,它可以正常工作并成功绘制。一旦我创建了 Spark 数据帧 sdf,即使你 运行 笔记本上的同一个单元格在读取和来自 pandas pdf 的输入信号中不再工作并给出以下错误:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-12-1c79b34272c5> in <module>()
     23 
     24 # Plot threshold
---> 25 plt.plot([min(t), max(t)], [thresh_top, thresh_top],       '--',  color='r', label='peaks-threshold')
     26 plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--',  color='g', label='valleys-threshold')
     27 

2 frames
/content/spark-3.1.2-bin-hadoop2.7/python/pyspark/sql/column.py in _to_java_column(col)
     47             "{0} of type {1}. "
     48             "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
---> 49             "function.".format(col, type(col)))
     50     return jcol
     51 

TypeError: Invalid argument, not a string or column: 0     2020-10-16
1     2020-10-17
2     2020-10-18
3     2020-10-19
4     2020-10-20
5     2020-10-21
6     2020-10-22
7     2020-10-23
8     2020-10-24
9     2020-10-25
10    2020-10-26
11    2020-10-27
12    2020-10-28
13    2020-10-29
14    2020-10-30
15    2020-10-31
16    2020-11-01
17    2020-11-02
18    2020-11-03
19    2020-11-04
20    2020-11-05
21    2020-11-06
22    2020-11-07
23    2020-11-08
24    2020-11-09
25    2020-11-10
26    2020-11-11
27    2020-11-12
28    2020-11-13
29    2020-11-14
30    2020-11-15
Name: date, dtype: object of type <class 'pandas.core.series.Series'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

当我尝试直接使用以下方法从 spark 数据帧读取数据时:

# Input signal from Spark dataframe
t = [val.date  for val in sdf.select('date').collect()]
x = [val.value for val in sdf.select('value').collect()]

遗憾的是绘图代码不起作用并抛出以下错误:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-14-9cfede77d0fb> in <module>()
     31 # Find indices of peaks & of valleys (from inverting the signal)
     32 peak_idx, _   = find_peaks(x,  height =  thresh_top)
---> 33 valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
     34 
     35 

TypeError: bad operand type for unary -: 'list'

我花了很多时间,但我无法修复这个错误。我也对其他非 find_peaks() 解决方案开放,以像这样的 numpythonic if I can adapt to the code and apply on Spark dataframe. I have tried many things you could check in this google Colab Notebook 绘制尖峰检测,并随时 run/test/edit 它进行快速调试。

问题是您没有使用相同的对象。

当您使用 pandas 并得到 x = pdf.value 时,您实际上得到了 Series 对象。这个对象可以把-放在前面,它知道它必须把里面的值转换成负数。

但是当你使用 PySpark 并收集值时,你会得到 list 对象,如果你把 - 放在前面,你会得到错误:

TypeError: bad operand type for unary -: 'list'

这告诉你它不知道如何处理它。

  • 所以第一件事要做,而不是:
valley_idx, _ = find_peaks(-x, height=-thresh_bottom)

您必须将值转换为负数,例如:

valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
  • 接下来,find_peaks 将 return ndarray 再次不能与 list 一起使用:
plt.plot(t[peak_idx], x[peak_idx], "x", color="r", label="peaks")

因此您必须手动完成,例如:

plt.plot(
        [t[i] for i in peak_idx],
        [x[i] for i in peak_idx],
        "x",
        color="r",
        label="peaks",
    )

我已经使用以下代码重现了您的情节(+ 以 PySpark 中的 medianstd_dev 为例):

# data is the same
# ...

# create a Spark dataframe
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(data=data, schema=schema)
std_dev = sdf.select(F.stddev(F.col("value")).alias("std")).collect()[0]["std"]
median = (
    sdf.groupBy("value")
    .agg(F.expr("percentile_approx(value, 0.5)").alias("med"))
    .collect()[0]["med"]
)
thresh_top = median + 1 * std_dev
thresh_bottom = median - 1 * std_dev
t = sdf.select("date").rdd.flatMap(lambda x: x).collect()
x = sdf.select("value").rdd.flatMap(lambda x: x).collect()
peak_idx, _ = find_peaks(x, height=thresh_top)
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)

plt.figure(figsize=(14, 12))
plt.plot(t, x, color="b", label="data")
plt.scatter(t, x, s=10, c="b", label="value")

# Plot threshold
plt.plot(
    [min(t), max(t)],
    [thresh_top, thresh_top],
    "--",
    color="r",
    label="peaks-threshold",
)
plt.plot(
    [min(t), max(t)],
    [thresh_bottom, thresh_bottom],
    "--",
    color="g",
    label="valleys-threshold",
)

# Plot peaks (red) and valleys (blue)
plt.plot(
    [t[i] for i in peak_idx],
    [x[i] for i in peak_idx],
    "x",
    color="r",
    label="peaks",
)
plt.plot(
    [t[i] for i in valley_idx],
    [x[i] for i in valley_idx],
    "x",
    color="g",
    label="valleys",
)

# ...