从正弦数据中查找异常值

Finding anomalous values from sinusoidal data

如何从以下数据中找出异常值。我正在模拟正弦曲线模式。虽然我可以绘制数据并发现数据中的任何异常或噪声,但如果不绘制数据我怎么能做到这一点。我正在寻找机器学习方法以外的简单方法。

import random 
import numpy as np 
import matplotlib.pyplot as plt 

N = 10                  # Set signal sample length
t1 = -np.pi             # Simulation begins at t1
t2 =  np.pi;            # Simulation  ends  at t2

in_array = np.linspace(t1, t2, N)
print("in_array : ", in_array)
out_array = np.sin(in_array)

plt.plot(in_array, out_array, color = 'red', marker = "o") ; plt.title("numpy.sin()")

注入随机噪声

noise_input = random.uniform(-.5, .5); print("Noise : ",noise_input)

in_array[random.randint(0,len(in_array)-1)] = noise_input
print(in_array)

plt.plot(in_array, out_array, color = 'red', marker = "o") ; plt.title("numpy.sin()")

有噪声的数据

您的问题取决于时间向量(一维)。您需要对该向量应用某种过滤器。

首先想到的是 scipymedfilt(中值滤波器),它看起来像这样:

from scipy.signal import medfilt
l1 = [0, 10, 20, 30, 2, 50, 70, 15, 90, 100]
l2 = medfilt(l1)
print(l2)

这个输出将是:

[ 0. 10. 20. 20. 30. 50. 50. 70. 90. 90.]

这个过滤器的问题是,如果我们将一些噪声值应用到向量的边缘,如 [200, 0, 10, 20, 30, 2, 50, 70, 15, 90, 100, -50],那么输出将类似于 [ 0. 10. 10. 20. 20. 30. 50. 50. 70. 90. 90. 0.],显然这不适用于正弦图,因为它将为正弦值数组生成相同的伪像。

解决此问题的更好方法是将时间向量视为 y 输出,将其索引值视为 x 输入,并对 "time linear function",不是引号,它只是意味着我们通过应用伪造的 X 向量来伪造二维模型。该代码暗示使用 scipylinregress(线性回归)函数:

from scipy.stats import linregress
l1 = [5, 0, 10, 20, 30, -20, 50, 70, 15, 90, 100]
l1_x = range(0, len(l1))

slope, intercept, r_val, p_val, std_err = linregress(l1_x, l1)
l1 = intercept + slope * l1_x

print(l1)

其输出将是:

[-10.45454545  -1.63636364   7.18181818  16.          24.81818182
  33.63636364  42.45454545  51.27272727  60.09090909  68.90909091
  77.72727273]

现在让我们将其应用于您的时间向量。

import random 
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 
from scipy.stats import linregress

N = 20
# N = 10                  # Set signal sample length
t1 = -np.pi             # Simulation begins at t1
t2 =  np.pi;            # Simulation  ends  at t2

in_array = np.linspace(t1, t2, N)

# add some noise
noise_input = random.uniform(-.5, .5);
in_array[random.randint(0, len(in_array)-1)] = noise_input

# apply filter on time array
in_array_x = range(0, len(in_array))

slope, intercept, r_val, p_val, std_err = linregress(in_array_x, in_array)
in_array = intercept + slope * in_array_x

# generate sine wave
out_array = np.sin(in_array)
print("OUT ARRAY")
print(out_array)

plt.plot(in_array, out_array, color = 'red', marker = "o") ; plt.title("numpy.sin()")

plt.show()

输出将是:

生成的信号将是原始信号的近似值,就像任何形式的 extrapolation/interpolation/regression 过滤一样。

我想到了以下方法来解决你的问题,因为你只有一些值在时间向量中是异常的,这意味着其余的值有规律的进展,这意味着如果我们收集集群下向量中的所有数据点并计算最大集群的平均步长(它本质上是代表真实交易的值池),然后我们可以使用该平均值在给定阈值下进行三元组检测,在向量上并检测哪些元素是异常的。

为此我们需要两个函数:calculate_average_step 计算最大的接近值簇的平均值,然后我们需要 detect_anomalous_values 产生异常值的索引向量,基于之前计算的平均值。

检测到异常值后,我们可以继续将它们替换为估计值,我们可以根据平均步长值和使用向量中的相邻点来确定该值。

import random 
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 


def calculate_average_step(array, threshold=5):
    """
    Determine the average step by doing a weighted average based on clustering of averages.
    array: our array
    threshold: the +/- offset for grouping clusters. Aplicable on all elements in the array. 
    """

    # determine all the steps
    steps = []
    for i in range(0, len(array) - 1):
        steps.append(abs(array[i] - array[i+1]))

    # determine the steps clusters
    clusters = []
    skip_indexes = []
    cluster_index = 0

    for i in range(len(steps)):
        if i in skip_indexes:
            continue

        # determine the cluster band (based on threshold)
        cluster_lower = steps[i] - (steps[i]/100) * threshold
        cluster_upper = steps[i] + (steps[i]/100) * threshold

        # create the new cluster
        clusters.append([])
        clusters[cluster_index].append(steps[i])

        # try to match elements from the rest of the array
        for j in range(i + 1, len(steps)):

            if not (cluster_lower <= steps[j] <= cluster_upper):
                continue

            clusters[cluster_index].append(steps[j])
            skip_indexes.append(j)

        cluster_index += 1  # increment the cluster id

    clusters = sorted(clusters, key=lambda x: len(x), reverse=True)
    biggest_cluster = clusters[0] if len(clusters) > 0 else None

    if biggest_cluster is None:
        return None

    return sum(biggest_cluster) / len(biggest_cluster)  # return our most common average


def detect_anomalous_values(array, regular_step, threshold=5):
    """
    Will scan every triad (3 points) in the array to detect anomalies.
    array: the array to iterate over.
    regular_step: the step around which we form the upper/lower band for filtering
    treshold: +/- variation between the steps of the first and median element and median and third element.
    """
    assert(len(array) >= 3)  # must have at least 3 elements

    anomalous_indexes = []

    step_lower = regular_step - (regular_step / 100) * threshold
    step_upper = regular_step + (regular_step / 100) * threshold

    # detection will be forward from i (hence 3 elements must be available for the d)
    for i in range(0, len(array) - 2):
        a = array[i]
        b = array[i+1]
        c = array[i+2]

        first_step = abs(a-b)
        second_step = abs(b-c)

        first_belonging = step_lower <= first_step <= step_upper
        second_belonging = step_lower <= second_step <= step_upper

        # detect that both steps are alright
        if first_belonging and second_belonging:
            continue  # all is good here, nothing to do

        # detect if the first point in the triad is bad
        if not first_belonging and second_belonging:
            anomalous_indexes.append(i)

        # detect the last point in the triad is bad
        if first_belonging and not second_belonging:
            anomalous_indexes.append(i+2)

        # detect the mid point in triad is bad (or everything is bad)
        if not first_belonging and not second_belonging:
            anomalous_indexes.append(i+1)
            # we won't add here the others because they will be detected by
            # the rest of the triad scans

    return sorted(set(anomalous_indexes))  # return unique indexes

if __name__ == "__main__":

    N = 10                  # Set signal sample length
    t1 = -np.pi             # Simulation begins at t1
    t2 =  np.pi;            # Simulation  ends  at t2

    in_array = np.linspace(t1, t2, N)

    # add some noise
    noise_input = random.uniform(-.5, .5);
    in_array[random.randint(0, len(in_array)-1)] = noise_input
    noisy_out_array = np.sin(in_array)

    # display noisy sin
    plt.figure()
    plt.plot(in_array, noisy_out_array, color = 'red', marker = "o");
    plt.title("noisy numpy.sin()")

    # detect anomalous values
    average_step = calculate_average_step(in_array)
    anomalous_indexes = detect_anomalous_values(in_array, average_step)

    # replace anomalous points with an estimated value based on our calculated average
    for anomalous in anomalous_indexes:

        # try forward extrapolation
        try:
            in_array[anomalous] = in_array[anomalous-1] + average_step
        # else try backwward extrapolation
        except IndexError:
            in_array[anomalous] = in_array[anomalous+1] - average_step

    # generate sine wave
    out_array = np.sin(in_array)

    plt.figure()
    plt.plot(in_array, out_array, color = 'green', marker = "o");
    plt.title("cleaned numpy.sin()")

    plt.show()

正弦噪声:

清理后的正弦: