ORTools 任务分配优化与持续时间

ORTools task allocation optimization with duration

我正在使用类似 https://github.com/google/or-tools/blob/master/examples/python/task_allocation_sat.py 的 or-tools 解决方案将任务分配到时隙,其中每个任务都有周期性并且每个时隙都有容量(最多可以在时隙内放置多少任务)。现在我想用基于任务持续时间的约束替换容量约束,其中每个任务都有持续时间,每个时隙都有最大持续时间,所以每个时隙只能有与其最大持续时间限制一样多的任务。但我不明白如何建立约束,即“检查一个槽中任务的持续时间总和,它应该小于 max_slot_duration”。

def main():
    available = [
        [1, 0, 0, 0, 0, 1, 0],
        [1, 1, 1, 1, 1, 0, 0],
        [0, 1, 1, 1, 1, 0, 0],
        [0, 0, 1, 1, 1, 0, 0],
        [0, 0, 1, 1, 1, 0, 0],
    ]

    periodicity = [
        2, 2, 1, 1, 3
    ]

    capacity = 3

    max_slot_duration = 124

    task_durations = [
        15, 20, 30, 50, 10
    ]

    ntasks = len(available)
    nslots = len(available[0])

    all_tasks = range(ntasks)
    all_slots = range(nslots)

    model = cp_model.CpModel()
    assign = {}
    for task in all_tasks:
        for slot in all_slots:
            assign[(task, slot)] = model.NewBoolVar('x[%i][%i]' % (task, slot))
    count = model.NewIntVar(0, nslots, 'count')
    slot_used = [model.NewBoolVar('slot_used[%i]' % s) for s in all_slots]

    for task in all_tasks:
        model.Add(
            sum(assign[(task, slot)] for slot in all_slots if available[task][slot] == 1) == periodicity[task])

    for slot in all_slots:
        model.Add(
            sum(assign[(task, slot)] for task in all_tasks
                if available[task][slot] == 1) <= capacity)
        for task in all_tasks:
            if available[task][slot] == 1:
                model.AddImplication(slot_used[slot].Not(),
                                     assign[(task, slot)].Not())
            else:
                model.Add(assign[(task, slot)] == 0)

    model.Add(count == sum(slot_used))

    model.Minimize(count)

    solver = cp_model.CpSolver()
    solver.parameters.log_search_progress = True
    solver.parameters.num_search_workers = 6
    solution_printer = TaskAssigningSolutionPrinter(all_tasks, all_slots, assign)
    status = solver.Solve(model, solution_printer)
    print(solution_printer.get_solution())

和解打印机

class TaskAssigningSolutionPrinter(cp_model.CpSolverSolutionCallback):
    def __init__(self, tasks, slots, assign):
        cp_model.CpSolverSolutionCallback.__init__(self)
        self.__tasks = tasks
        self.__slots = slots
        self.__assign = assign
        self.__solutions = {}
        self.__solution_count = 0

    def on_solution_callback(self):
        self.__solutions[self.__solution_count] = {}
        for slot in self.__slots:
            self.__solutions[self.__solution_count][slot] = []
            for task in self.__tasks:
                if self.Value(self.__assign[(task, slot)]) == 1:
                    self.__solutions[self.__solution_count][slot].append(task)
        self.__solution_count += 1

    def get_solution(self):
        return self.__solutions

感谢 or-tools discord 频道解决方案的 Stradivari (Xiang) 用户。 我们这里只需要求和约束。

for slot in all_slots:
  model.Add(sum(assign[(task, slot)] * task_durations[task] for task in all_tasks if
                      available[task][slot] == 1) <= max_slot_duration * workers[slot])