如何使用 python 连接组件根据日期合并记录?

How to combine records based on date using python connected components?

我有一个记录列表(person_id、start_date、end_date)如下:

person_records = [['1', '08/01/2011', '08/31/2011'],
                 ['1', '09/01/2011', '09/30/2011'],
                 ['1', '11/01/2011', '11/30/2011'],
                 ['1', '12/01/2011', '12/31/2011'],
                 ['1', '01/01/2012', '01/31/2012'],
                 ['1', '03/01/2012', '03/31/2012']]

每个人的记录按 start_date 的升序排序。通过根据日期合并记录并将第一个期间的 start_date 记录为开始日期,将最后一个期间的 end_date 记录为结束日期来合并期间。但是,如果一个周期结束与下一周期开始之间的时间为 32 天或更短,我们应将其视为连续周期。否则,我们将其视为两个周期:

consolidated_person_records = [['1', '08/01/2011', '09/30/2011'],
                               ['1', '11/01/2011', '03/31/2012']]

有没有什么方法可以使用 python 连接的组件来做到这一点?

我考虑过你的问题,我最初写了一个例程,将日期间隔映射到一维二进制数组中,其中数组中的每个条目都是一天,连续的几天是连续的条目。使用此数据结构,您可以执行膨胀和腐蚀以填充小间隙,从而合并间隔,然后将合并的间隔映射回日期范围。因此,我们根据您的想法使用标准栅格连接组件逻辑来解决您的问题(基于图形的连接组件也可以工作...)

这很好用,如果您真的感兴趣,我可以 post 代码,但后来我想知道前一个方法与仅迭代(预排序)的简单例程相比有什么优势) 日期范围,如果差距很小,则将下一个合并到当前。

这是简单例程的代码,使用样本数据 运行 大约需要 120 微秒。如果通过重复 10,000 次来扩展示例数据,则此例程在我的计算机上大约需要 1 秒。

当我对基于形态学的解决方案进行计时时,速度慢了大约 2 倍。它在某些情况下可能会更好,但我建议我们先尝试简单,然后看看是否存在需要不同算法方法的真正问题。

from datetime import datetime
from datetime import timedelta
import numpy as np

问题中提供的示例数据:

SAMPLE_DATA = [['1', '08/01/2011', '08/31/2011'],
               ['1', '09/01/2011', '09/30/2011'],
               ['1', '11/01/2011', '11/30/2011'],
               ['1', '12/01/2011', '12/31/2011'],
               ['1', '01/01/2012', '01/31/2012'],
               ['1', '03/01/2012', '03/31/2012'],
               ['2', '11/11/2011', '11/30/2011'],
               ['2', '12/11/2011', '12/31/2011'],
               ['2', '01/11/2014', '01/31/2014'],
               ['2', '03/11/2014', '03/31/2014']]

简单的方法:

def simple_method(in_data=SAMPLE_DATA, person='1', fill_gap_days=31, printit=False):
    date_format_str = "%m/%d/%Y"
    dat = np.array(in_data)
    dat = dat[dat[:, 0] == person, 1:]  # just this person's data
    # assume date intervals are already sorted by start date
    new_intervals = []
    cur_start = None
    cur_end = None
    gap_days = timedelta(days=fill_gap_days)
    for (s_str, e_str) in dat:
        dt_start = datetime.strptime(s_str, date_format_str)
        dt_end = datetime.strptime(e_str, date_format_str)
        if cur_end is None:
            cur_start = dt_start
            cur_end = dt_end
            continue
        else:
            if cur_end + gap_days >= dt_start:
                # merge, keep existing cur_start, extend cur_end
                cur_end = dt_end
            else:
                # new interval, save previous and reset current to this
                new_intervals.append((cur_start, cur_end))
                cur_start = dt_start
                cur_end = dt_end
    # make sure final interval is saved
    new_intervals.append((cur_start, cur_end))

    if printit:
        print_it(person, new_intervals, date_format_str)

    return new_intervals

下面是打印范围的简单漂亮的打印函数。

def print_it(person, consolidated_ranges, fmt):
    for (s, e) in consolidated_ranges:
        print(person, s.strftime(fmt), e.strftime(fmt))

运行在ipython如下。请注意,可以关闭打印结果以计时计算。

In [10]: _ = simple_method(printit=True)
1 08/01/2011 09/30/2011
1 11/01/2011 03/31/2012

运行 在 ipython 中使用 %timeit 宏:

In [8]: %timeit simple_method(in_data=SAMPLE_DATA)
10000 loops, best of 3: 118 µs per loop

In [9]: %timeit simple_method(in_data=SAMPLE_DATA*10000)
1 loops, best of 3: 1.06 s per loop

[编辑 2016 年 2 月 8 日:要使长答案更长...] 正如我在回复中所言,我确实创建了一个形态学/一维连接组件版本,并且在我的时间里它慢了大约 2 倍。但是为了完整起见,我将展示形态学方法,也许其他人会看到其中是否存在较大的加速区域。

#using same imports as previous code with one more
import calendar as cal

def make_occupancy_array(start_year, end_year):
    """
    Represents the time between the start and end years, inclusively, as a 1-D array
    of 'pixels', where each pixel corresponds to a day. Consecutive days are thus
    mapped to consecutive pixels. We can perform morphology on this 1D array to
    close small gaps between date ranges.
    """
    years_days = [(yr, 366 if cal.isleap(yr) else 365) for yr in range(start_year, end_year+1)]
    YD = np.array(years_days)  # like [ (2011, 365), (2012, 366), ... ] in ndarray form
    total_num_days = YD[:, 1].sum()
    occupancy = np.zeros((total_num_days,), dtype='int')
    return YD, occupancy

使用占用数组来表示时间间隔,我们需要两个函数来将日期映射到数组中的位置以及逆映射。

def map_date_to_position(dt, YD):
    """
    Maps the datetime value to a position in the occupancy array
    """
    # the start position is the offset to day 1 in the dt1,year,
    # plus the day of year - 1 for dt1 (day of year is 1-based indexed)
    yr = dt.year
    assert yr in YD[:, 0]  # guard...YD should include all years for this person's dates
    position = YD[YD[:, 0] < yr, 1].sum()  # the sum of the days in year before this year
    position += dt.timetuple().tm_yday - 1
    return position


def map_position_to_date(pos, YD):
    """
    Inverse of map_date_to_position, this maps a position in the
    occupancy array back to a datetime value
    """
    yr_offsets = np.cumsum(YD[:, 1])
    day_offsets = yr_offsets - pos
    idx = np.flatnonzero(day_offsets > 0)[0]
    year = YD[idx, 0]
    day_of_year = pos if idx == 0 else pos - yr_offsets[idx-1]
    # construct datetime as first of year plus day offset in year
    dt = datetime.strptime(str(year), "%Y")
    dt += timedelta(days=int(day_of_year)+1)
    return dt

以下函数在给定开始日期和结束日期(含)的情况下填充占用数组的相关部分,并可选择将范围的末尾扩展一个间隙填充边距(如单边扩张)。

def set_occupancy(dt1, dt2, YD, occupancy, fill_gap_days=0):
    """
    For a date range starting dt1 and ending, inclusively, dt2,
    sets the corresponding 'pixels' in occupancy vector to 1.
    If fill_gap_days > 0, then the end 'pixel' is extended
    (dilated) by this many positions, so that we can fill
    the gaps between intervals that are close to each other.
    """
    pos1 = map_date_to_position(dt1, YD)
    pos2 = map_date_to_position(dt2, YD) + fill_gap_days
    occupancy[pos1:pos2] = 1

一旦我们在占用数组中有了合并的间隔,我们需要将它们读回日期间隔,如果我们之前已经完成间隙填充,则可以选择执行单侧侵蚀。

def get_occupancy_intervals(OCC, fill_gap_days=0):
    """
    Find the runs in the OCC array corresponding
    to the 'dilated' consecutive positions, and then
    'erode' back to the correct end dates by subtracting
    the fill_gap_days.
    """
    starts = np.flatnonzero(np.diff(OCC) > 0)  # where runs of nonzeros start
    ends = np.flatnonzero(np.diff(OCC) < 0)  # where runs of nonzeros end
    ends -= fill_gap_days  # erode back to original length prior to dilation
    return [(s, e) for (s, e) in zip(starts, ends)]

把它们放在一起...

def morphology_method(in_data=SAMPLE_DATA, person='1', fill_gap_days=31, printit=False):
    date_format_str = "%m/%d/%Y"
    dat = np.array(in_data)
    dat = dat[dat[:, 0] == person, 1:]  # just this person's data

    # for the intervals of this person, get starting and ending years
    # we assume the data is already sorted
    #start_year = datetime.strptime(dat[0, 0], date_format_str)
    #end_year = datetime.strptime(dat[-1, 1], date_format_str)
    start_times = [datetime.strptime(d, date_format_str) for d in dat[:, 0]]
    end_times = [datetime.strptime(d, date_format_str) for d in dat[:, 1]]
    start_year = start_times[0].year
    end_year = end_times[-1].year

    # create the occupancy array, dilated so that each interval
    # is extended by fill_gap_days to 'fill in' the small gaps
    # between intervals
    YD, OCC = make_occupancy_array(start_year, end_year)
    for (s, e) in zip(start_times, end_times):
        set_occupancy(s, e, YD, OCC, fill_gap_days)

    # return the intervals from OCC after having filled gaps,
    # and trim end dates back to original position.
    consolidated_pos = get_occupancy_intervals(OCC, fill_gap_days)

    # map positions back to date-times
    consolidated_ranges = [(map_position_to_date(s, YD), map_position_to_date(e, YD)) for
                           (s, e) in consolidated_pos]

    if printit:
        print_it(person, consolidated_ranges, date_format_str)

    return consolidated_ranges

09/30/2011 + 32 天 = 11/01/2011,因此您的示例不起作用。您的意思可能是 31 天或更短时间。

在 python 中处理日期时,您可以使用 datetime 模块中的 datetime 和 timedelta。使用 strptime 和 strftime 转换 from/to 字符串,例如 '09/01/2011'。

我更喜欢在开始时将所有内容转换为日期时间,进行所有与日期相关的处理,然后在需要时在最后转换回日期字符串。

from datetime import datetime, timedelta

PERSON_ID = 0
START_DATE = 1
END_DATE = 2

def consolidate(records, maxgap=timedelta(days=31)):
    consolidated = []
    consolidated_start = records[0][START_DATE]
    consolidated_end = records[0][END_DATE]

    for person_id, start_date, end_date in records:

        if start_date <= consolidated_end + maxgap:
            consolidated_end = end_date

        else:
            consolidated.append([person_id, consolidated_start, consolidated_end])
            consolidated_start = start_date
            consolidated_end = end_date

    else:
        consolidated.append([person_id, consolidated_start, consolidated_end])

    return consolidated


fmt = "%m/%d/%Y"

records = [[id, datetime.strptime(start, fmt), datetime.strptime(end, fmt)] for id, start, end in person_records]

records = consolidate(records)

records = [[id, start.strftime(fmt), end.strftime(fmt)] for id, start, end in records]

编辑:这是使用 connected_components 的 consolidate() 版本:

import numpy as np
from scipy.sparse.csgraph import connected_components

def consolidate(records, maxgap=32):
    person_id = records[0][0]

    dates = np.array([[rec[1].date(), rec[2].date()] for rec in records], dtype='datetime64')
    start_dates, end_dates = dates.T

    gaps = start_dates[1:] - end_dates[:-1]

    conns = np.diagflat(gaps < np.timedelta64(maxgap, 'D'), 1)

    num_comps, comps = connected_components(conns)

    return [[person_id, 
             min(start_dates[comps==i]).astype(datetime),
             max(end_dates[comps==i]).astype(datetime)
            ] for i in range(num_comps)
           ]