使用 Optapy 的优化算法、约束和分数计算配置

Optimisation algorithm, constraints and score calculation configurations with Optapy

我在 python 中使用 Optapy 库,并且我在 GitHub 中使用学校时间表实例作为基础。我对库配置有几个问题:

我正在查看 OptaPlanner 用户指南,但我不确定如何在 python 上实施它。

感谢指导。

可以使用编程方式配置 OptaPy API。配置 类 可以在 optapy.config 包中找到。特别是,您通过 withPhases:

选择优化算法
import optapy.config
solver_config = optapy.config.solver.SolverConfig().withEntityClasses(get_class(Lesson)) \
    .withSolutionClass(get_class(TimeTable)) \
    .withConstraintProviderClass(get_class(define_constraints)) \
    .withTerminationSpentLimit(Duration.ofSeconds(30)) \
    .withPhases([
        optapy.config.constructionheuristic.ConstructionHeuristicPhaseConfig(),
        optapy.config.localsearch.LocalSearchPhaseConfig()
            .withAcceptorConfig(optapy.config.localsearch.decider.acceptor.LocalSearchAcceptorConfig()
                                .withSimulatedAnnealingStartingTemperature("0hard/0soft"))
    ])

(以上配置模拟退火)

最近添加了 @easy_score_calculator@incremental_score_calculator 装饰器,它们允许您分别定义 EasyScoreCalculator 或 IncrementalScoreCalculator。例如,(EasyScoreCalculator, 最大值):

@optapy.easy_score_calculator
def my_score_calculator(solution: Solution):
    total_score = 0
    for entity in solution.entity_list:
        total_score += 0 if entity.value is None else entity.value
    return optapy.score.SimpleScore.of(total_score)

solver_config = optapy.config.solver.SolverConfig()
termination_config = optapy.config.solver.termination.TerminationConfig()
termination_config.setBestScoreLimit('9')
solver_config.withSolutionClass(optapy.get_class(Solution)) \
    .withEntityClasses(optapy.get_class(Entity)) \
    .withEasyScoreCalculatorClass(optapy.get_class(my_score_calculator)) \
    .withTerminationConfig(termination_config)

或使用 IncrementalScoreCalculator (NQueens):

@optapy.incremental_score_calculator
class IncrementalScoreCalculator:
    score: int
    row_index_map: dict
    ascending_diagonal_index_map: dict
    descending_diagonal_index_map: dict

    def resetWorkingSolution(self, working_solution: Solution):
        n = working_solution.n
        self.row_index_map = dict()
        self.ascending_diagonal_index_map = dict()
        self.descending_diagonal_index_map = dict()
        for i in range(n):
            self.row_index_map[i] = list()
            self.ascending_diagonal_index_map[i] = list()
            self.descending_diagonal_index_map[i] = list()
            if i != 0:
                self.ascending_diagonal_index_map[n - 1 + i] = list()
                self.descending_diagonal_index_map[-i] = list()
        self.score = 0
        for queen in working_solution.queen_list:
            self.insert(queen)

    def beforeEntityAdded(self, entity: any):
        pass

    def afterEntityAdded(self, entity: any):
        self.insert(entity)

    def beforeVariableChanged(self, entity: any, variableName: str):
        self.retract(entity)

    def afterVariableChanged(self, entity: any, variableName: str):
        self.insert(entity)

    def beforeEntityRemoved(self, entity: any):
        self.retract(entity)

    def afterEntityRemoved(self, entity: any):
        pass

    def insert(self, queen: Queen):
        row = queen.row
        if row is not None:
            row_index = queen.row
            row_index_list = self.row_index_map[row_index]
            self.score -= len(row_index_list)
            row_index_list.append(queen)
            ascending_diagonal_index_list = self.ascending_diagonal_index_map[queen.getAscendingDiagonalIndex()]
            self.score -= len(ascending_diagonal_index_list)
            ascending_diagonal_index_list.append(queen)
            descending_diagonal_index_list = self.descending_diagonal_index_map[queen.getDescendingDiagonalIndex()]
            self.score -= len(descending_diagonal_index_list)
            descending_diagonal_index_list.append(queen)

    def retract(self, queen: Queen):
        row = queen.row
        if row is not None:
            row_index = queen.row
            row_index_list = self.row_index_map[row_index]
            row_index_list.remove(queen)
            self.score += len(row_index_list)
            ascending_diagonal_index_list = self.ascending_diagonal_index_map[queen.getAscendingDiagonalIndex()]
            ascending_diagonal_index_list.remove(queen)
            self.score += len(ascending_diagonal_index_list)
            descending_diagonal_index_list = self.descending_diagonal_index_map[queen.getDescendingDiagonalIndex()]
            descending_diagonal_index_list.remove(queen)
            self.score += len(descending_diagonal_index_list)

    def calculateScore(self) -> optapy.score.SimpleScore:
        return optapy.score.SimpleScore.of(self.score)

solver_config = optapy.config.solver.SolverConfig()
termination_config = optapy.config.solver.termination.TerminationConfig()
termination_config.setBestScoreLimit('0')
solver_config.withSolutionClass(optapy.get_class(Solution)) \
    .withEntityClasses(optapy.get_class(Queen)) \
    .withScoreDirectorFactory(optapy.config.score.director.ScoreDirectorFactoryConfig() \
                              .withIncrementalScoreCalculatorClass(optapy.get_class(IncrementalScoreCalculator))) \
    .withTerminationConfig(termination_config)

如果权重是指 ConstraintConfiguration(允许您为每个问题定义自定义约束权重),则尚未通过 OptaPy 公开。如果您的意思是如何制作约束权重 more/less,请将第二个参数更改为 penalize/reward(如果不变),或者添加计算约束乘数的第三个参数(第二个参数将乘以), 像这样:

def undesired_day_for_employee(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(shift_class) \
        .join(availability_class, [Joiners.equal(lambda shift: shift.employee,
                                                 lambda availability: availability.employee),
                                   Joiners.equal(lambda shift: shift.start.date(),
                                                 lambda availability: availability.date)
                                   ]) \
        .filter(lambda shift, availability: availability.availability_type == AvailabilityType.UNDESIRED) \
        .penalize('Undesired day for employee', HardSoftScore.ofSoft(2),
                  lambda shift, availability: get_shift_duration_in_minutes(shift))

(该约束对员工在不期望的一天工作的每分钟进行 2 软惩罚)