ORTools 任务分配优化与持续时间
ORTools task allocation optimization with duration
我正在使用类似 https://github.com/google/or-tools/blob/master/examples/python/task_allocation_sat.py 的 or-tools 解决方案将任务分配到时隙,其中每个任务都有周期性并且每个时隙都有容量(最多可以在时隙内放置多少任务)。现在我想用基于任务持续时间的约束替换容量约束,其中每个任务都有持续时间,每个时隙都有最大持续时间,所以每个时隙只能有与其最大持续时间限制一样多的任务。但我不明白如何建立约束,即“检查一个槽中任务的持续时间总和,它应该小于 max_slot_duration”。
def main():
available = [
[1, 0, 0, 0, 0, 1, 0],
[1, 1, 1, 1, 1, 0, 0],
[0, 1, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 0, 0],
]
periodicity = [
2, 2, 1, 1, 3
]
capacity = 3
max_slot_duration = 124
task_durations = [
15, 20, 30, 50, 10
]
ntasks = len(available)
nslots = len(available[0])
all_tasks = range(ntasks)
all_slots = range(nslots)
model = cp_model.CpModel()
assign = {}
for task in all_tasks:
for slot in all_slots:
assign[(task, slot)] = model.NewBoolVar('x[%i][%i]' % (task, slot))
count = model.NewIntVar(0, nslots, 'count')
slot_used = [model.NewBoolVar('slot_used[%i]' % s) for s in all_slots]
for task in all_tasks:
model.Add(
sum(assign[(task, slot)] for slot in all_slots if available[task][slot] == 1) == periodicity[task])
for slot in all_slots:
model.Add(
sum(assign[(task, slot)] for task in all_tasks
if available[task][slot] == 1) <= capacity)
for task in all_tasks:
if available[task][slot] == 1:
model.AddImplication(slot_used[slot].Not(),
assign[(task, slot)].Not())
else:
model.Add(assign[(task, slot)] == 0)
model.Add(count == sum(slot_used))
model.Minimize(count)
solver = cp_model.CpSolver()
solver.parameters.log_search_progress = True
solver.parameters.num_search_workers = 6
solution_printer = TaskAssigningSolutionPrinter(all_tasks, all_slots, assign)
status = solver.Solve(model, solution_printer)
print(solution_printer.get_solution())
和解打印机
class TaskAssigningSolutionPrinter(cp_model.CpSolverSolutionCallback):
def __init__(self, tasks, slots, assign):
cp_model.CpSolverSolutionCallback.__init__(self)
self.__tasks = tasks
self.__slots = slots
self.__assign = assign
self.__solutions = {}
self.__solution_count = 0
def on_solution_callback(self):
self.__solutions[self.__solution_count] = {}
for slot in self.__slots:
self.__solutions[self.__solution_count][slot] = []
for task in self.__tasks:
if self.Value(self.__assign[(task, slot)]) == 1:
self.__solutions[self.__solution_count][slot].append(task)
self.__solution_count += 1
def get_solution(self):
return self.__solutions
感谢 or-tools discord 频道解决方案的 Stradivari (Xiang) 用户。
我们这里只需要求和约束。
for slot in all_slots:
model.Add(sum(assign[(task, slot)] * task_durations[task] for task in all_tasks if
available[task][slot] == 1) <= max_slot_duration * workers[slot])
我正在使用类似 https://github.com/google/or-tools/blob/master/examples/python/task_allocation_sat.py 的 or-tools 解决方案将任务分配到时隙,其中每个任务都有周期性并且每个时隙都有容量(最多可以在时隙内放置多少任务)。现在我想用基于任务持续时间的约束替换容量约束,其中每个任务都有持续时间,每个时隙都有最大持续时间,所以每个时隙只能有与其最大持续时间限制一样多的任务。但我不明白如何建立约束,即“检查一个槽中任务的持续时间总和,它应该小于 max_slot_duration”。
def main():
available = [
[1, 0, 0, 0, 0, 1, 0],
[1, 1, 1, 1, 1, 0, 0],
[0, 1, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 0, 0],
]
periodicity = [
2, 2, 1, 1, 3
]
capacity = 3
max_slot_duration = 124
task_durations = [
15, 20, 30, 50, 10
]
ntasks = len(available)
nslots = len(available[0])
all_tasks = range(ntasks)
all_slots = range(nslots)
model = cp_model.CpModel()
assign = {}
for task in all_tasks:
for slot in all_slots:
assign[(task, slot)] = model.NewBoolVar('x[%i][%i]' % (task, slot))
count = model.NewIntVar(0, nslots, 'count')
slot_used = [model.NewBoolVar('slot_used[%i]' % s) for s in all_slots]
for task in all_tasks:
model.Add(
sum(assign[(task, slot)] for slot in all_slots if available[task][slot] == 1) == periodicity[task])
for slot in all_slots:
model.Add(
sum(assign[(task, slot)] for task in all_tasks
if available[task][slot] == 1) <= capacity)
for task in all_tasks:
if available[task][slot] == 1:
model.AddImplication(slot_used[slot].Not(),
assign[(task, slot)].Not())
else:
model.Add(assign[(task, slot)] == 0)
model.Add(count == sum(slot_used))
model.Minimize(count)
solver = cp_model.CpSolver()
solver.parameters.log_search_progress = True
solver.parameters.num_search_workers = 6
solution_printer = TaskAssigningSolutionPrinter(all_tasks, all_slots, assign)
status = solver.Solve(model, solution_printer)
print(solution_printer.get_solution())
和解打印机
class TaskAssigningSolutionPrinter(cp_model.CpSolverSolutionCallback):
def __init__(self, tasks, slots, assign):
cp_model.CpSolverSolutionCallback.__init__(self)
self.__tasks = tasks
self.__slots = slots
self.__assign = assign
self.__solutions = {}
self.__solution_count = 0
def on_solution_callback(self):
self.__solutions[self.__solution_count] = {}
for slot in self.__slots:
self.__solutions[self.__solution_count][slot] = []
for task in self.__tasks:
if self.Value(self.__assign[(task, slot)]) == 1:
self.__solutions[self.__solution_count][slot].append(task)
self.__solution_count += 1
def get_solution(self):
return self.__solutions
感谢 or-tools discord 频道解决方案的 Stradivari (Xiang) 用户。 我们这里只需要求和约束。
for slot in all_slots:
model.Add(sum(assign[(task, slot)] * task_durations[task] for task in all_tasks if
available[task][slot] == 1) <= max_slot_duration * workers[slot])