使用 Pandas 扩展时间序列事件
Expanding timeseries events with Pandas
问题
我正在寻找有关如何使它更 pythonic 并提高效率的建议。
我有一个包含事件的数据框,每个事件至少有一个开始和结束时间戳。我正在扩展记录数,以便新的 table 间隔重叠的每一小时都有一条记录。
这与 QlikView 中的 IntervalMatch function 基本相同。
示例:
18:00-20:00 的事件扩展为两个不同的记录,一个用于 18:00-19:00,另一个用于 19:00-20:00.
当前解
我有一个完全可用的解决方案,但我认为它相当丑陋,并且在具有 >100k 行和 10-20 列的大型数据集上非常慢。
import pandas as pd
from datetime import timedelta
def interval_match(df):
intervals = []
def perdelta(start, end, delta):
curr = start.replace(minute=0, second=0)
while curr < end:
yield curr
curr += delta
def interval_split(x):
for t in perdelta(x.Start, x.End, timedelta(hours=1)):
_ = ([x.id,
x.Start,
x.End,
max(t, x.Start),
min((t+timedelta(hours=1), x.End))])
intervals.append(_)
df.apply(interval_split, axis=1)
ndf = pd.DataFrame(intervals,
columns=['id',
'Start',
'End',
'intervalStart',
'intervalEnd'])
ndf['Duration'] = ndf.iEnd - ndf.iStart
return ndf
通过一些示例数据,函数 interval_match()
可以这样使用:
# Some example data
df = pd.DataFrame({'End': {0: pd.Timestamp('2016-01-01 09:24:20')},
'Start': {0: pd.Timestamp('2016-01-01 06:56:10')},
'id': {0: 1234562}})
# Running the function
interval_match(df).to_dict()
# Output
{'Duration': {0: Timedelta('0 days 00:03:50'),
1: Timedelta('0 days 01:00:00'),
2: Timedelta('0 days 01:00:00'),
3: Timedelta('0 days 00:24:20')},
'End': {0: Timestamp('2016-01-01 09:24:20'),
1: Timestamp('2016-01-01 09:24:20'),
2: Timestamp('2016-01-01 09:24:20'),
3: Timestamp('2016-01-01 09:24:20')},
'Start': {0: Timestamp('2016-01-01 06:56:10'),
1: Timestamp('2016-01-01 06:56:10'),
2: Timestamp('2016-01-01 06:56:10'),
3: Timestamp('2016-01-01 06:56:10')},
'intervalEnd':{0: Timestamp('2016-01-01 07:00:00'),
1: Timestamp('2016-01-01 08:00:00'),
2: Timestamp('2016-01-01 09:00:00'),
3: Timestamp('2016-01-01 09:24:20')},
'intervalStart': {0: Timestamp('2016-01-01 06:56:10'),
1: Timestamp('2016-01-01 07:00:00'),
2: Timestamp('2016-01-01 08:00:00'),
3: Timestamp('2016-01-01 09:00:00')},
'id': {0: 1234562,
1: 1234562,
2: 1234562,
3: 1234562}}
我的愿望是
- 提高效率,最好使用内置 Pandas 函数或一些 numpy 魔法。
- 不必像我今天在 interval_split 函数中那样处理列。只需操作并扩展整个数据框。
感谢任何建议或帮助。
我做了一个变体(受您的代码启发)并且 运行 非常慢。我处理 20k 行数据的时间约为 5 分钟,分析后的罪魁祸首是 .append
。有一个技巧是把所有的记录放到一个字典里,然后用一个DataFrame
的from_dict
方法。使用 from_dict 处理相同的 20k 行,它在大约 5 秒内完成(因此快了约 60 倍)。
我附上了我的代码,它的灵感来自于你的代码,它对于列输入也是通用的(我的测试使用与生产使用是不同的)。
import pandas as pd
from collections import namedtuple
from datetime import timedelta
Interval = namedtuple('Interval', 'field_name start_time end_time delta')
class IntervalMatch(object):
def __init__(self):
pass
def per_delta(self,interval: Interval, include_start: bool):
current_interval = interval.start_time
if not include_start:
current_interval += pd.DateOffset(seconds=interval.delta)
while current_interval < interval.end_time:
yield current_interval
current_interval += pd.DateOffset(seconds=interval.delta)
def _copy(self, row, columns: pd.Index):
values = pd.Series(row).values
return pd.DataFrame([values], columns=columns.values).copy(True)
def interval_split(self, interval: Interval, base_row: pd.Series, columns: pd.Index, include_start: bool):
for time in self.per_delta(interval, include_start):
extended_row = self._copy(base_row, columns)
extended_row.at[(0, interval.field_name)] = time
yield extended_row
def get_exploded_records(self, data_to_examine: pd.DataFrame, time_field_name: str):
last_row = None
results = pd.DataFrame()
delta = 1 # second
time_col_index = data_to_examine.columns.get_loc(time_field_name)
# process each row. It is possible there is a map/reduce/fluent way of doing this w/ Pandas
intermediate_results = {}
current_row = -1
for row in data_to_examine.itertuples(index=False):
current_row += 1
if last_row is None:
last_row = row
intermediate_results[current_row] = row
continue
total_seconds = (row[time_col_index] - last_row[time_col_index]).total_seconds()
if total_seconds > 1 and total_seconds < 100:
# there is a gap, so we want to explode the gap into the data and fill it with last_row values.
interval = Interval(time_field_name, last_row[time_col_index], row[time_col_index], delta)
for intrvl in self.interval_split(interval, last_row, data_to_examine.columns, False):
# we must unroll the list of rows to just the first row (since there is only one)
intermediate_results[current_row] = intrvl.values[0]
current_row += 1
# append the current row
intermediate_results[current_row] = row
last_row = row
results = pd.DataFrame.from_dict(intermediate_results, orient='index') #, columns=data_to_examine.columns)
return results
def test():
print("Preparing Data")
timestamps = ['2016-01-01 09:24:20', '2016-01-01 09:24:21',
'2016-01-01 09:24:23', '2016-01-01 09:24:24', '2016-01-01 09:24:40']
data_with_gaps = pd.DataFrame({'timestamp':[pd.Timestamp(timestamp) for timestamp in timestamps],
'names':['Torial', 'Torial', 'Knut', 'Knut', 'Torial'],
'action':['Add','Edit','Add', 'Edit','Delete']})
interval = IntervalMatch()
print("Getting Exploded Records")
exploded = interval.get_exploded_records(data_with_gaps, 'timestamp')
print(f"Data with Gaps: {data_with_gaps}")
print(f"Exploded: {exploded}")
exploded.to_csv("Exploded_test.csv")
问题
我正在寻找有关如何使它更 pythonic 并提高效率的建议。
我有一个包含事件的数据框,每个事件至少有一个开始和结束时间戳。我正在扩展记录数,以便新的 table 间隔重叠的每一小时都有一条记录。
这与 QlikView 中的 IntervalMatch function 基本相同。
示例: 18:00-20:00 的事件扩展为两个不同的记录,一个用于 18:00-19:00,另一个用于 19:00-20:00.
当前解
我有一个完全可用的解决方案,但我认为它相当丑陋,并且在具有 >100k 行和 10-20 列的大型数据集上非常慢。
import pandas as pd
from datetime import timedelta
def interval_match(df):
intervals = []
def perdelta(start, end, delta):
curr = start.replace(minute=0, second=0)
while curr < end:
yield curr
curr += delta
def interval_split(x):
for t in perdelta(x.Start, x.End, timedelta(hours=1)):
_ = ([x.id,
x.Start,
x.End,
max(t, x.Start),
min((t+timedelta(hours=1), x.End))])
intervals.append(_)
df.apply(interval_split, axis=1)
ndf = pd.DataFrame(intervals,
columns=['id',
'Start',
'End',
'intervalStart',
'intervalEnd'])
ndf['Duration'] = ndf.iEnd - ndf.iStart
return ndf
通过一些示例数据,函数 interval_match()
可以这样使用:
# Some example data
df = pd.DataFrame({'End': {0: pd.Timestamp('2016-01-01 09:24:20')},
'Start': {0: pd.Timestamp('2016-01-01 06:56:10')},
'id': {0: 1234562}})
# Running the function
interval_match(df).to_dict()
# Output
{'Duration': {0: Timedelta('0 days 00:03:50'),
1: Timedelta('0 days 01:00:00'),
2: Timedelta('0 days 01:00:00'),
3: Timedelta('0 days 00:24:20')},
'End': {0: Timestamp('2016-01-01 09:24:20'),
1: Timestamp('2016-01-01 09:24:20'),
2: Timestamp('2016-01-01 09:24:20'),
3: Timestamp('2016-01-01 09:24:20')},
'Start': {0: Timestamp('2016-01-01 06:56:10'),
1: Timestamp('2016-01-01 06:56:10'),
2: Timestamp('2016-01-01 06:56:10'),
3: Timestamp('2016-01-01 06:56:10')},
'intervalEnd':{0: Timestamp('2016-01-01 07:00:00'),
1: Timestamp('2016-01-01 08:00:00'),
2: Timestamp('2016-01-01 09:00:00'),
3: Timestamp('2016-01-01 09:24:20')},
'intervalStart': {0: Timestamp('2016-01-01 06:56:10'),
1: Timestamp('2016-01-01 07:00:00'),
2: Timestamp('2016-01-01 08:00:00'),
3: Timestamp('2016-01-01 09:00:00')},
'id': {0: 1234562,
1: 1234562,
2: 1234562,
3: 1234562}}
我的愿望是
- 提高效率,最好使用内置 Pandas 函数或一些 numpy 魔法。
- 不必像我今天在 interval_split 函数中那样处理列。只需操作并扩展整个数据框。
感谢任何建议或帮助。
我做了一个变体(受您的代码启发)并且 运行 非常慢。我处理 20k 行数据的时间约为 5 分钟,分析后的罪魁祸首是 .append
。有一个技巧是把所有的记录放到一个字典里,然后用一个DataFrame
的from_dict
方法。使用 from_dict 处理相同的 20k 行,它在大约 5 秒内完成(因此快了约 60 倍)。
我附上了我的代码,它的灵感来自于你的代码,它对于列输入也是通用的(我的测试使用与生产使用是不同的)。
import pandas as pd
from collections import namedtuple
from datetime import timedelta
Interval = namedtuple('Interval', 'field_name start_time end_time delta')
class IntervalMatch(object):
def __init__(self):
pass
def per_delta(self,interval: Interval, include_start: bool):
current_interval = interval.start_time
if not include_start:
current_interval += pd.DateOffset(seconds=interval.delta)
while current_interval < interval.end_time:
yield current_interval
current_interval += pd.DateOffset(seconds=interval.delta)
def _copy(self, row, columns: pd.Index):
values = pd.Series(row).values
return pd.DataFrame([values], columns=columns.values).copy(True)
def interval_split(self, interval: Interval, base_row: pd.Series, columns: pd.Index, include_start: bool):
for time in self.per_delta(interval, include_start):
extended_row = self._copy(base_row, columns)
extended_row.at[(0, interval.field_name)] = time
yield extended_row
def get_exploded_records(self, data_to_examine: pd.DataFrame, time_field_name: str):
last_row = None
results = pd.DataFrame()
delta = 1 # second
time_col_index = data_to_examine.columns.get_loc(time_field_name)
# process each row. It is possible there is a map/reduce/fluent way of doing this w/ Pandas
intermediate_results = {}
current_row = -1
for row in data_to_examine.itertuples(index=False):
current_row += 1
if last_row is None:
last_row = row
intermediate_results[current_row] = row
continue
total_seconds = (row[time_col_index] - last_row[time_col_index]).total_seconds()
if total_seconds > 1 and total_seconds < 100:
# there is a gap, so we want to explode the gap into the data and fill it with last_row values.
interval = Interval(time_field_name, last_row[time_col_index], row[time_col_index], delta)
for intrvl in self.interval_split(interval, last_row, data_to_examine.columns, False):
# we must unroll the list of rows to just the first row (since there is only one)
intermediate_results[current_row] = intrvl.values[0]
current_row += 1
# append the current row
intermediate_results[current_row] = row
last_row = row
results = pd.DataFrame.from_dict(intermediate_results, orient='index') #, columns=data_to_examine.columns)
return results
def test():
print("Preparing Data")
timestamps = ['2016-01-01 09:24:20', '2016-01-01 09:24:21',
'2016-01-01 09:24:23', '2016-01-01 09:24:24', '2016-01-01 09:24:40']
data_with_gaps = pd.DataFrame({'timestamp':[pd.Timestamp(timestamp) for timestamp in timestamps],
'names':['Torial', 'Torial', 'Knut', 'Knut', 'Torial'],
'action':['Add','Edit','Add', 'Edit','Delete']})
interval = IntervalMatch()
print("Getting Exploded Records")
exploded = interval.get_exploded_records(data_with_gaps, 'timestamp')
print(f"Data with Gaps: {data_with_gaps}")
print(f"Exploded: {exploded}")
exploded.to_csv("Exploded_test.csv")