从时间序列中提取模式
Extracting patterns from time-series
我有一个时间序列,它本质上相当于某种仪器在进行“检测”时记录当前时间。因此,采样率不是恒定时间,但是我们可以通过“重新采样”来对待它,这取决于可靠地进行检测这一事实,我们可以简单地插入 0 来“填充”间隙。这对以后很重要...
仪器应该检测附近另一台仪器发出的“信号”。第二台仪器在某个未知周期发出信号,T
(例如每秒 1 个信号),“抖动”可能约为周期的十分之几。
我的目标是仅使用“检测”仪器记录的时间戳来确定此周期(或频率,如果您愿意)。然而,不幸的是,检测器充满了噪声,并且大量(我估计 97-98%
)“检测”(以及时间序列中的“点”)是由于噪声造成的。因此,提取周期需要更仔细的分析。
我的第一个想法是简单地将时间序列输入 FFT 算法(我正在使用 FFTW
/DHT
),但这并不是特别有启发性。我也尝试了我自己的(诚然相当粗糙)算法,它简单地计算了该系列与增加周期的“干净”系列的互相关。我也没有在这方面走得太远,还有很多细节需要考虑(阶段等)。
我突然想到以前一定有人做过这样的事情,而且肯定有一种“不错”的方法来完成它。
这是我的方法。给定一个周期,我们可以使用动态程序对其进行评分,以找到包括第一次和最后一次检测在内的检测时间的子序列,并最大化间隙对数似然之和,其中间隙对数似然定义为减去间隙和周期的差异(高斯抖动模型)。
如果我们有大致正确的周期,那么我们可以得到一个很好的空位序列(开始和结束有些奇怪,哪里有漏检,但这没关系)。
如果我们有错误的周期,那么我们最终会得到基本上指数抖动,其对数似然性很低。
下面的 C++ 生成带有植入周期的假检测时间,然后搜索周期。分数由泊松噪声分数的(错误)估计值归一化,因此错误周期的分数约为 0.4。见下图。
#include <algorithm>
#include <cmath>
#include <iostream>
#include <limits>
#include <random>
#include <vector>
namespace {
static constexpr double kFalseNegativeRate = 0.01;
static constexpr double kCoefficientOfVariation = 0.003;
static constexpr int kSignals = 6000;
static constexpr int kNoiseToSignalRatio = 50;
template <class URNG>
std::vector<double> FakeTimes(URNG &g, const double period) {
std::vector<double> times;
std::bernoulli_distribution false_negative(kFalseNegativeRate);
std::uniform_real_distribution<double> phase(0, period);
double signal = phase(g);
std::normal_distribution<double> interval(period,
kCoefficientOfVariation * period);
std::uniform_real_distribution<double> noise(0, kSignals * period);
for (int i = 0; i < kSignals; i++) {
if (!false_negative(g)) {
times.push_back(signal);
}
signal += interval(g);
for (double j = 0; j < kNoiseToSignalRatio; j++) {
times.push_back(noise(g));
}
}
std::sort(times.begin(), times.end());
return times;
}
constexpr double Square(const double x) { return x * x; }
struct Subsequence {
double score;
int previous;
};
struct Result {
double score = std::numeric_limits<double>::quiet_NaN();
double median_interval = std::numeric_limits<double>::quiet_NaN();
};
Result Score(const std::vector<double> ×, const double period) {
if (times.empty() || !std::is_sorted(times.begin(), times.end())) {
return {};
}
std::vector<Subsequence> bests;
bests.reserve(times.size());
bests.push_back({0, -1});
for (int i = 1; i < times.size(); i++) {
Subsequence best = {std::numeric_limits<double>::infinity(), -1};
for (int j = i - 1; j > -1; j--) {
const double difference = times[i] - times[j];
const double penalty = Square(difference - period);
if (difference >= period && penalty >= best.score) {
break;
}
const Subsequence candidate = {bests[j].score + penalty, j};
if (candidate.score < best.score) {
best = candidate;
}
}
bests.push_back(best);
}
std::vector<double> intervals;
int i = bests.size() - 1;
while (true) {
int previous_i = bests[i].previous;
if (previous_i < 0) {
break;
}
intervals.push_back(times[i] - times[previous_i]);
i = previous_i;
}
if (intervals.empty()) {
return {};
}
const double duration = times.back() - times.front();
// The rate is doubled because we can look for a time in either direction.
const double rate = 2 * (times.size() - 1) / duration;
// Mean of the square of an exponential distribution with the given rate.
const double mean_square = 2 / Square(rate);
const double score = bests.back().score / (intervals.size() * mean_square);
const auto median_interval = intervals.begin() + intervals.size() / 2;
std::nth_element(intervals.begin(), median_interval, intervals.end());
return {score, *median_interval};
}
} // namespace
int main() {
std::default_random_engine g;
const auto times = FakeTimes(g, std::sqrt(2));
for (int i = 0; i < 2000; i++) {
const double period = std::pow(1.001, i) / 3;
const Result result = Score(times, period);
std::cout << period << ' ' << result.score << ' ' << result.median_interval
<< std::endl;
}
}
我有一个时间序列,它本质上相当于某种仪器在进行“检测”时记录当前时间。因此,采样率不是恒定时间,但是我们可以通过“重新采样”来对待它,这取决于可靠地进行检测这一事实,我们可以简单地插入 0 来“填充”间隙。这对以后很重要...
仪器应该检测附近另一台仪器发出的“信号”。第二台仪器在某个未知周期发出信号,T
(例如每秒 1 个信号),“抖动”可能约为周期的十分之几。
我的目标是仅使用“检测”仪器记录的时间戳来确定此周期(或频率,如果您愿意)。然而,不幸的是,检测器充满了噪声,并且大量(我估计 97-98%
)“检测”(以及时间序列中的“点”)是由于噪声造成的。因此,提取周期需要更仔细的分析。
我的第一个想法是简单地将时间序列输入 FFT 算法(我正在使用 FFTW
/DHT
),但这并不是特别有启发性。我也尝试了我自己的(诚然相当粗糙)算法,它简单地计算了该系列与增加周期的“干净”系列的互相关。我也没有在这方面走得太远,还有很多细节需要考虑(阶段等)。
我突然想到以前一定有人做过这样的事情,而且肯定有一种“不错”的方法来完成它。
这是我的方法。给定一个周期,我们可以使用动态程序对其进行评分,以找到包括第一次和最后一次检测在内的检测时间的子序列,并最大化间隙对数似然之和,其中间隙对数似然定义为减去间隙和周期的差异(高斯抖动模型)。
如果我们有大致正确的周期,那么我们可以得到一个很好的空位序列(开始和结束有些奇怪,哪里有漏检,但这没关系)。
如果我们有错误的周期,那么我们最终会得到基本上指数抖动,其对数似然性很低。
下面的 C++ 生成带有植入周期的假检测时间,然后搜索周期。分数由泊松噪声分数的(错误)估计值归一化,因此错误周期的分数约为 0.4。见下图。
#include <algorithm>
#include <cmath>
#include <iostream>
#include <limits>
#include <random>
#include <vector>
namespace {
static constexpr double kFalseNegativeRate = 0.01;
static constexpr double kCoefficientOfVariation = 0.003;
static constexpr int kSignals = 6000;
static constexpr int kNoiseToSignalRatio = 50;
template <class URNG>
std::vector<double> FakeTimes(URNG &g, const double period) {
std::vector<double> times;
std::bernoulli_distribution false_negative(kFalseNegativeRate);
std::uniform_real_distribution<double> phase(0, period);
double signal = phase(g);
std::normal_distribution<double> interval(period,
kCoefficientOfVariation * period);
std::uniform_real_distribution<double> noise(0, kSignals * period);
for (int i = 0; i < kSignals; i++) {
if (!false_negative(g)) {
times.push_back(signal);
}
signal += interval(g);
for (double j = 0; j < kNoiseToSignalRatio; j++) {
times.push_back(noise(g));
}
}
std::sort(times.begin(), times.end());
return times;
}
constexpr double Square(const double x) { return x * x; }
struct Subsequence {
double score;
int previous;
};
struct Result {
double score = std::numeric_limits<double>::quiet_NaN();
double median_interval = std::numeric_limits<double>::quiet_NaN();
};
Result Score(const std::vector<double> ×, const double period) {
if (times.empty() || !std::is_sorted(times.begin(), times.end())) {
return {};
}
std::vector<Subsequence> bests;
bests.reserve(times.size());
bests.push_back({0, -1});
for (int i = 1; i < times.size(); i++) {
Subsequence best = {std::numeric_limits<double>::infinity(), -1};
for (int j = i - 1; j > -1; j--) {
const double difference = times[i] - times[j];
const double penalty = Square(difference - period);
if (difference >= period && penalty >= best.score) {
break;
}
const Subsequence candidate = {bests[j].score + penalty, j};
if (candidate.score < best.score) {
best = candidate;
}
}
bests.push_back(best);
}
std::vector<double> intervals;
int i = bests.size() - 1;
while (true) {
int previous_i = bests[i].previous;
if (previous_i < 0) {
break;
}
intervals.push_back(times[i] - times[previous_i]);
i = previous_i;
}
if (intervals.empty()) {
return {};
}
const double duration = times.back() - times.front();
// The rate is doubled because we can look for a time in either direction.
const double rate = 2 * (times.size() - 1) / duration;
// Mean of the square of an exponential distribution with the given rate.
const double mean_square = 2 / Square(rate);
const double score = bests.back().score / (intervals.size() * mean_square);
const auto median_interval = intervals.begin() + intervals.size() / 2;
std::nth_element(intervals.begin(), median_interval, intervals.end());
return {score, *median_interval};
}
} // namespace
int main() {
std::default_random_engine g;
const auto times = FakeTimes(g, std::sqrt(2));
for (int i = 0; i < 2000; i++) {
const double period = std::pow(1.001, i) / 3;
const Result result = Score(times, period);
std::cout << period << ' ' << result.score << ' ' << result.median_interval
<< std::endl;
}
}