添加子集匹配维度的区间树?

Interval tree with added dimension of subset matching?

这是一个关于一个有点复杂的问题的算法问题。基础是这样的:

基于可用槽保留槽的调度系统。插槽有一定的标准,我们称它们为 标签。如果可用插槽的标签集是保留插槽的超集,则保留将通过这些标签与可用插槽匹配。

作为一个具体的例子,以这个场景为例:

11:00  12:00  13:00
+--------+
| A, B   |
+--------+
       +--------+
       | C, D   |
       +--------+

在11:00到12:30时间之间可以预订AB标签,从12:00到13:30 CD 可用,并且从大约 12:00 到 12:30.

有重叠
11:00  12:00  13:00
+--------+
| A, B   |
+--------+
       +--------+
       | C, D   |
       +--------+
  xxxxxx
  x A  x
  xxxxxx

此处已预订 A,因此 AB 无法在 11:15-ish 和 [=101= 之间进行其他预订]-ish.

简而言之就是这个想法。可用插槽没有特定限制:

系统中唯一需要遵守的规则是:

澄清一下:如果同时有两个可用空档,例如标签 A,那么可以同时为 A 进行两次预留,但不能再多了。

我正在使用 interval tree 的修改后的实现;快速概览:

当该过程完成后,剩下的是剩余的可用时段,我可以查询是否可以为特定时间进行新预订并添加它。

数据结构看起来像这样:

{
  type: 'available', 
  begin: 1497857244, 
  end: 1497858244, 
  tags: [{ foo: 'bar' }, { baz: 42 }]
}
{
  type: 'reserved', 
  begin: 1497857345, 
  end: 1497857210, 
  tags: [{ foo: 'bar' }]
}

标签本身就是键值对象,它们的列表是一个"tag set"。如果有帮助,可以将它们连载;到目前为止,我使用的是 Python set 类型,这使得比较非常容易。 Slot begin/end 时间是树中的 UNIX 时间戳。我不是特别喜欢这些特定的数据结构,如果有用的话可以重构它们。


我面临的问题是这不是没有错误的;每隔一段时间,一个预订就会潜入系统并与其他预订发生冲突,我还不知道这到底是怎么发生的。当标签以复杂的方式重叠时,它也不是很聪明,需要计算最佳分布,以便所有预订都能尽可能地适合可用的插槽;事实上,目前还不确定如何在重叠场景中将预留与可用空档相匹配。

我想知道的是:区间树最适合用于此目的,但我当前添加标签集匹配作为附加维度的系统笨拙且固定; 有没有一种数据结构或算法可以优雅地处理这个问题?

必须支持的动作:

  1. 向系统查询与特定标签集匹配的可用空位(考虑到可能会降低可用性但本身不属于所述标签集的预订;例如,在上面的示例中查询 B 的可用性).
  2. 确保没有匹配的可用空档的预订不能添加到系统。

您的问题可以使用 constraint programming. In python this can be implemented using the python-constraint 库解决。

首先,我们需要一种方法来检查两个插槽是否相互一致。如果两个插槽共享一个标签并且它们的框架重叠,则此功能 return 为真。在 python 中,这可以使用以下函数实现

def checkNoOverlap(slot1, slot2):
    shareTags = False
    for tag in slot1['tags']:
        if tag in slot2['tags']:
            shareTags = True
            break    
    if not shareTags: return True
    return not (slot2['begin'] <= slot1['begin'] <= slot2['end'] or 
                slot2['begin'] <= slot1['end'] <= slot2['end'])

我不确定您是希望标签完全相同(如 {foo: bar} 等于 {foo: bar})还是仅键(如 {foo: bar} 等于 {foo: qux} ), 但您可以在上面的函数中更改它。

一致性检查

我们可以将 python-constraint 模块用于您请求的两种功能。

第二个功能是最简单的。为了实现这一点,我们可以使用函数 isConsistent(set),它将所提供的数据结构中的槽列表作为输入。该函数然后将所有插槽提供给 python-constraint 并将检查插槽列表是否一致(没有 2 个插槽不应该重叠,重叠)和 return 一致性。

def isConsistent(set):
        #initialize python-constraint context
        problem = Problem()
        #add all slots the context as variables with a singleton domain
        for i in range(len(set)):
            problem.addVariable(i, [set[i]])        
        #add a constraint for each possible pair of slots
        for i in range(len(set)):
            for j in range(len(set)):
                #we don't want slots to be checked against themselves
                if i == j:
                    continue
                #this constraint uses the checkNoOverlap function
                problem.addConstraint(lambda a,b: checkNoOverlap(a, b), (i, j))
        # getSolutions returns all the possible combinations of domain elements
        # because all domains are singleton, this either returns a list with length 1 (consistent) or 0 (inconsistent)
        return not len(problem.getSolutions()) == 0

只要用户想要添加预订时段,就可以调用此函数。可以将输入槽添加到已经存在的槽列表中,并可以检查一致性。如果一致,则新插槽将被保留。否则,新的插槽重叠,应该被拒绝。

正在查找可用的插槽

这个问题有点棘手。我们可以使用与上面相同的功能并进行一些重大更改。我们现在想要将所有可能的插槽添加到现有插槽中,而不是将新插槽与现有插槽一起添加。然后我们可以检查所有这些可能的插槽与保留插槽的一致性,并向约束系统询问一致的组合。

因为如果不做任何限制,可能的槽数是无限的,所以我们首先需要为程序声明一些参数:

MIN = 149780000 #available time slots can never start earlier then this time
MAX = 149790000 #available time slots can never start later then this time
GRANULARITY = 1*60 #possible time slots are always at least one minut different from each other

我们现在可以继续执行主要功能了。它看起来很像一致性检查,但我们现在添加一个变量来发现所有可用的插槽,而不是来自用户的新插槽。

def availableSlots(tags, set):
    #same as above
    problem = Problem()
    for i in range(len(set)):
        problem.addVariable(i, [set[i]])
    #add an extra variable for the available slot is added, with a domain of all possible slots
    problem.addVariable(len(set), generatePossibleSlots(MIN, MAX, GRANULARITY, tags))
    for i in range(len(set) +1):
        for j in range(len(set) +1):
            if i == j:
                continue
            problem.addConstraint(lambda a, b: checkNoOverlap(a, b), (i, j))
    #extract the available time slots from the solution for clean output
    return filterAvailableSlots(problem.getSolutions())

我使用了一些辅助函数来保持代码更简洁。它们包含在这里。

def filterAvailableSlots(possibleCombinations):
    result = []
    for slots in possibleCombinations:
        for key, slot in slots.items():
            if slot['type'] == 'available':
                result.append(slot)

    return result

def generatePossibleSlots(min, max, granularity, tags):
    possibilities = []
    for i in range(min, max - 1, granularity):
        for j in range(i + 1, max, granularity):
            possibleSlot = {
                              'type': 'available',
                              'begin': i,
                              'end': j,
                              'tags': tags
            }
            possibilities.append(possibleSlot)
    return tuple(possibilities)

您现在可以将函数 getAvailableSlots(tags, set) 与您想要可用插槽和一组已预留插槽的标签一起使用。请注意,此函数实际上 return 所有一致的可能插槽,因此无需努力寻找最大长度之一或进行其他优化。

希望对您有所帮助! (我按照你在 pycharm 中描述的那样工作)

这是一个解决方案,我将在下面包含所有代码。

1。创建一个 table 个空位和一个 table 个预留

2。创建保留矩阵 x 插槽

根据该预留时段组合是否可能由真值或假值填充

3。找出允许最多预留槽组合的最佳映射

注意:我当前的解决方案无法很好地适应非常大的数组,因为它涉及循环遍历大小 = 槽数的列表的所有可能排列。我已发布 another question 以查看是否有人可以找到更好的方法。然而,这个解决方案是准确的并且可以优化

Python代码来源

第 1 部分

from IPython.display import display
import pandas as pd
import datetime

available_data = [
    ['SlotA', datetime.time(11, 0, 0), datetime.time(12, 30, 0), set(list('ABD'))],
    ['SlotB',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('C'))],
    ['SlotC',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('ABCD'))],
    ['SlotD',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('AD'))],
]

reservation_data = [
    ['ReservationA', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('AD'))],
    ['ReservationB', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('A'))],
    ['ReservationC', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
    ['ReservationD', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
    ['ReservationE', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('D'))]
]

reservations = pd.DataFrame(data=reservation_data, columns=['reservations', 'begin', 'end', 'tags']).set_index('reservations')
slots = pd.DataFrame(data=available_data, columns=['slots', 'begin', 'end', 'tags']).set_index('slots')

display(slots)
display(reservations)

第 2 部分

def is_possible_combination(r):
    return (r['begin'] >= slots['begin']) & (r['end'] <= slots['end']) & (r['tags'] <= slots['tags'])

solution_matrix = reservations.apply(is_possible_combination, axis=1).astype(int)
display(solution_matrix)

第 3 部分

import numpy as np
from itertools import permutations

# add dummy columns to make the matrix square if it is not
sqr_matrix = solution_matrix
if sqr_matrix.shape[0] > sqr_matrix.shape[1]:
    # uhoh, there are more reservations than slots... this can't be good
    for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
        sqr_matrix.loc[:,'FakeSlot' + str(i)] = [1] * sqr_matrix.shape[0]
elif sqr_matrix.shape[0] < sqr_matrix.shape[1]:
    # there are more slots than customers, why doesn't anyone like us?
    for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
        sqr_matrix.loc['FakeCustomer' + str(i)] = [1] * sqr_matrix.shape[1]

# we only want the values now
A = solution_matrix.values.astype(int)

# make an identity matrix (the perfect map)
imatrix = np.diag([1]*A.shape[0])

# randomly swap columns on the identity matrix until they match. 
n = A.shape[0]

# this will hold the map that works the best
best_map_so_far = np.zeros([1,1])

for column_order in permutations(range(n)):
    # this is an identity matrix with the columns swapped according to the permutation
    imatrix = np.zeros(A.shape)
    for row, column in enumerate(column_order):
        imatrix[row,column] = 1

    # is this map better than the previous best?
    if sum(sum(imatrix * A)) > sum(sum(best_map_so_far)):
        best_map_so_far = imatrix

    # could it be? a perfect map??
    if sum(sum(imatrix * A)) == n:
        break

if sum(sum(imatrix * A)) != n:
    print('a perfect map was not found')

output = pd.DataFrame(A*imatrix, columns=solution_matrix.columns, index=solution_matrix.index, dtype=int)
display(output)

and 建议的方法都有帮助,但最终还不够。我想出了一个很好解决它的混合方法。

主要问题是这是一个三维的问题,很难一次解决所有维度的问题。这不仅仅是匹配时间重叠或标签重叠,而是匹配 时间片 与标签重叠。根据时间甚至标签将空档与其他空档进行匹配非常简单,但是将已经匹配的可用性空档与另一个时间的另一个预订进行匹配就相当复杂了。意思是,在这种情况下,一个可用性可以覆盖不同时间的两个预订:

+---------+
| A, B    |
+---------+
xxxxx xxxxx
x A x x A x
xxxxx xxxxx

试图将其纳入基于约束的编程中需要极其复杂的约束关系,这很难管理。我对此的解决方案是简化问题……

删除一维

它不是一次解决所有维度,而是极大地简化了问题以在很大程度上删除时间维度。我通过使用现有的间隔树并根据需要对其进行切片来做到这一点:

def __init__(self, slots):
    self.tree = IntervalTree(slots)

def timeslot_is_available(self, start: datetime, end: datetime, attributes: set):
    candidate = Slot(start.timestamp(), end.timestamp(), dict(type=SlotType.RESERVED, attributes=attributes))
    slots = list(self.tree[start.timestamp():end.timestamp()])
    return self.model_is_consistent(slots + [candidate])

为了查询一个特定的槽是否可用,我只取那个特定时间相关的槽(self.tree[..:..]),这将计算的复杂性降低到一个局部子集:

  |      |             +-+ = availability
+-|------|-+           xxx = reservation
  |  +---|------+
xx|x  xxx|x
  |  xxxx|
  |      |

然后我确认那个窄片内的一致性:

@staticmethod
def model_is_consistent(slots):
    def can_handle(r):
        return lambda a: r.attributes <= a.attributes and a.contains_interval(r)

    av = [s for s in slots if s.type == SlotType.AVAILABLE]
    rs = [s for s in slots if s.type == SlotType.RESERVED]

    p = Problem()
    p.addConstraint(AllDifferentConstraint())
    p.addVariables(range(len(rs)), av)

    for i, r in enumerate(rs):
        p.addConstraint(can_handle(r), (i,))

    return p.getSolution() is not None

(我在这里省略了一些优化和其他代码。)

这部分是 Arne 和 Tinker 建议的混合方法。它使用基于约束的编程来找到匹配的插槽,使用修补匠建议的矩阵算法。基本上:如果这个问题有任何解决方案,其中所有预订都可以分配给不同的可用时隙,那么这个时间片就处于一致状态。由于我正在传递所需的预留槽,如果模型仍然一致,包括该槽,这意味着可以安全地预留该槽。

如果在这个狭窄 window 范围内有两个可分配给同一可用性的短期预订,这仍然是有问题的,但这种情况的可能性很低,结果只是可用性查询的假阴性;误报会更成问题。

正在查找可用的插槽

找到所有可用的插槽是一个更复杂的问题,因此再次进行一些简化是必要的。首先,只能查询特定标签集的模型可用性(没有 "give me all globally available slots"),其次只能以特定粒度(所需槽长度)查询。这非常适合我的特定用例,我只需要向用户提供他们可以保留的插槽列表,例如 9:15-9:30、9:30-9: 45等。这通过重用上面的代码使算法非常简单:

def free_slots(self, start: datetime, end: datetime, attributes: set, granularity: timedelta):
    slots = []
    while start < end:
        slot_end = start + granularity
        if self.timeslot_is_available(start, slot_end, attributes):
            slots.append((start, slot_end))
        start += granularity

    return slots

换句话说,它只是在给定的时间间隔内遍历所有可能的插槽,并从字面上检查该插槽是否可用。这是一个有点暴力的解决方案,但工作得很好。