使用 python/numpy 重采样时间序列数据
Resampling time series data using python/numpy
我正在尝试使用 python 中的以下代码对一些时间序列数据进行重新采样。我想要做的是采用 2 个数组,一个具有观察时间,另一个具有值。在给定的示例中 time
和 values
。我想记录特定时间间隔的观察结果,比如时间间隔 [0,2]
中的每个 0.1
。这将给出一个 [[time, values]]
的数组,看起来像 [[0.0, 1], [1.0, 3], [1.2, 5], [1.3, 6], [2.0, 8]]
。现在在下一次更新之间的每个中间值我想用最后一个值填充它,例如。在 [0.0, 1], [1.0, 3]
之间我想要一对 [0.1, 1] ..[0.9, 1]
.
import numpy as np
time = np.array([0, 0.0120, 0.0130, 1, 1.02, 1.2, 1.3, 1.32, 2 ])
values = np.array([1, 3, 2, 3, 4, 5, 6, 7, 8])
#time_interval = 1e-1
newvals = []
def resample(time, values, time_interval):
currentTime = 0.0
newvals = []
vals = []
for idx in range(len(time)):
t = time[idx]
data = values[idx]
if t >= currentTime:
newvals.append([max(t, currentTime),data])
currentTime = max(t, currentTime) +time_interval
else:
continue
#vals = []
newvals_copy = np.array(newvals)
seen = set(newvals_copy[:,0])
for idx in range(len(newvals)):
rec_time = newvals[idx][0]
#seen.add(rec_time)
#print(idx, idx+1, len(newvals))
next_idx = idx+1
if next_idx == len(newvals):
break
else:
#print(idx)
next_time = newvals[idx+1][0]
intermediate_val = np.arange(rec_time , next_time, time_interval)
#print(intermediate_val[])
for value in intermediate_val:
if value not in seen:
#print(idx, value)
newvals.append([value, newvals[idx][1]])
newvals = np.array(newvals)
newvals=np.unique(newvals, axis=0)
return newvals#, len(newvals)
print(resample(time, values, 1e-1))
#print(np.arange(0,2.1,0.1), len(np.arange(0,2.1,0.1)))
此测试用例生成所需的输出,
[[0. 1. ]
[0.1 1. ]
[0.2 1. ]
[0.3 1. ]
[0.4 1. ]
[0.5 1. ]
[0.6 1. ]
[0.7 1. ]
[0.8 1. ]
[0.9 1. ]
[1. 3. ]
[1.1 3. ]
[1.2 5. ]
[1.3 6. ]
[1.4 6. ]
[1.5 6. ]
[1.6 6. ]
[1.7 6. ]
[1.8 6. ]
[1.9 6. ]
[2. 8. ]]
然而,运行 在 real_data = resample(real_time, real_values, 1e-1)
哪里
real_time
= https://filedropper.com/d/s/7Q5IqtRzEh42p2oU0qTrbnuTb8iRK4 ,
real_values
= https://filedropper.com/d/s/hixCEY7QmqGcDBjEanhX2UgocKmV4K
real_data[:,0] = [0. 0.1 0.10000947 0.20000947 0.20000981 0.30000981
0.30001438 0.40001438 0.40002427 0.50002427 0.50006157 0.60006157
0.6000784 0.7000784 0.70009857 0.80009857 0.80010533 0.90010533
0.90012181 1.00012181 1.00020397 1.10020397 1.1002512 1.2002512
1.20025706 1.30025706 1.30026507 1.40026507 1.4002744 1.5002744
1.50028026 1.60028026 1.60029566 1.70029566 1.70029855 1.80029855
1.80030084 1.90030084 1.90032208]
这是不正确的。预期输出应该和测试用例一样,
real_data[:,0]=[0. 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. 1.1 1.2 1.3 1.4 1.5 1.6 1.7
1.8 1.9 2. ]
我猜这个错误是由于浮动 points/decimals 的处理方式造成的?但我不确定如何解决这个问题。
编辑
尝试根据评论四舍五入,更好但不理想
real_data = resample(np.round(real_time, decimals = 1), real_values, 1e-1)
仍然给出一个包含重复项的数组real_data[:,0][0. 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.8 0.9 1. 1.1 1.2 1.3 1.4 1.5 1.6 1.7 1.8 1.9 2. ]
理想情况下,我想让这个解决方案适用于任意数据集和时间间隔值。
可能像这样的东西可以满足您的需要:
import numpy as np
from scipy.interpolate import interp1d
# The test data.
time = np.array([0, 0.0120, 0.0130, 1, 1.02, 1.2, 1.3, 1.32, 2])
values = np.array([1, 3, 2, 3, 4, 5, 6, 7, 8])
# The new time basis we're aiming for.
t_new = np.linspace(0, 2, 21)
# Throw away times that we don't like.
new_data = [[t, v] for t, v in zip(time, values) if t in t_new]
t_clean, v_clean = np.array(new_data).T
# Make the interpolator function.
func = interp1d(t_clean, v_clean, kind="previous")
# Interpolate the data into the new time basis.
v_new = func(t_new)
现在 v_new
就像:
array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 3., 3., 3., 6., 6., 6., 6.,
6., 6., 6., 8.])
我正在尝试使用 python 中的以下代码对一些时间序列数据进行重新采样。我想要做的是采用 2 个数组,一个具有观察时间,另一个具有值。在给定的示例中 time
和 values
。我想记录特定时间间隔的观察结果,比如时间间隔 [0,2]
中的每个 0.1
。这将给出一个 [[time, values]]
的数组,看起来像 [[0.0, 1], [1.0, 3], [1.2, 5], [1.3, 6], [2.0, 8]]
。现在在下一次更新之间的每个中间值我想用最后一个值填充它,例如。在 [0.0, 1], [1.0, 3]
之间我想要一对 [0.1, 1] ..[0.9, 1]
.
import numpy as np
time = np.array([0, 0.0120, 0.0130, 1, 1.02, 1.2, 1.3, 1.32, 2 ])
values = np.array([1, 3, 2, 3, 4, 5, 6, 7, 8])
#time_interval = 1e-1
newvals = []
def resample(time, values, time_interval):
currentTime = 0.0
newvals = []
vals = []
for idx in range(len(time)):
t = time[idx]
data = values[idx]
if t >= currentTime:
newvals.append([max(t, currentTime),data])
currentTime = max(t, currentTime) +time_interval
else:
continue
#vals = []
newvals_copy = np.array(newvals)
seen = set(newvals_copy[:,0])
for idx in range(len(newvals)):
rec_time = newvals[idx][0]
#seen.add(rec_time)
#print(idx, idx+1, len(newvals))
next_idx = idx+1
if next_idx == len(newvals):
break
else:
#print(idx)
next_time = newvals[idx+1][0]
intermediate_val = np.arange(rec_time , next_time, time_interval)
#print(intermediate_val[])
for value in intermediate_val:
if value not in seen:
#print(idx, value)
newvals.append([value, newvals[idx][1]])
newvals = np.array(newvals)
newvals=np.unique(newvals, axis=0)
return newvals#, len(newvals)
print(resample(time, values, 1e-1))
#print(np.arange(0,2.1,0.1), len(np.arange(0,2.1,0.1)))
此测试用例生成所需的输出,
[[0. 1. ]
[0.1 1. ]
[0.2 1. ]
[0.3 1. ]
[0.4 1. ]
[0.5 1. ]
[0.6 1. ]
[0.7 1. ]
[0.8 1. ]
[0.9 1. ]
[1. 3. ]
[1.1 3. ]
[1.2 5. ]
[1.3 6. ]
[1.4 6. ]
[1.5 6. ]
[1.6 6. ]
[1.7 6. ]
[1.8 6. ]
[1.9 6. ]
[2. 8. ]]
然而,运行 在 real_data = resample(real_time, real_values, 1e-1)
哪里
real_time
= https://filedropper.com/d/s/7Q5IqtRzEh42p2oU0qTrbnuTb8iRK4 ,
real_values
= https://filedropper.com/d/s/hixCEY7QmqGcDBjEanhX2UgocKmV4K
real_data[:,0] = [0. 0.1 0.10000947 0.20000947 0.20000981 0.30000981
0.30001438 0.40001438 0.40002427 0.50002427 0.50006157 0.60006157
0.6000784 0.7000784 0.70009857 0.80009857 0.80010533 0.90010533
0.90012181 1.00012181 1.00020397 1.10020397 1.1002512 1.2002512
1.20025706 1.30025706 1.30026507 1.40026507 1.4002744 1.5002744
1.50028026 1.60028026 1.60029566 1.70029566 1.70029855 1.80029855
1.80030084 1.90030084 1.90032208]
这是不正确的。预期输出应该和测试用例一样,
real_data[:,0]=[0. 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. 1.1 1.2 1.3 1.4 1.5 1.6 1.7
1.8 1.9 2. ]
我猜这个错误是由于浮动 points/decimals 的处理方式造成的?但我不确定如何解决这个问题。
编辑
尝试根据评论四舍五入,更好但不理想
real_data = resample(np.round(real_time, decimals = 1), real_values, 1e-1)
仍然给出一个包含重复项的数组real_data[:,0][0. 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.8 0.9 1. 1.1 1.2 1.3 1.4 1.5 1.6 1.7 1.8 1.9 2. ]
理想情况下,我想让这个解决方案适用于任意数据集和时间间隔值。
可能像这样的东西可以满足您的需要:
import numpy as np
from scipy.interpolate import interp1d
# The test data.
time = np.array([0, 0.0120, 0.0130, 1, 1.02, 1.2, 1.3, 1.32, 2])
values = np.array([1, 3, 2, 3, 4, 5, 6, 7, 8])
# The new time basis we're aiming for.
t_new = np.linspace(0, 2, 21)
# Throw away times that we don't like.
new_data = [[t, v] for t, v in zip(time, values) if t in t_new]
t_clean, v_clean = np.array(new_data).T
# Make the interpolator function.
func = interp1d(t_clean, v_clean, kind="previous")
# Interpolate the data into the new time basis.
v_new = func(t_new)
现在 v_new
就像:
array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 3., 3., 3., 6., 6., 6., 6.,
6., 6., 6., 8.])