作业车间调度的弧约束

arcs constraints for jobshop Scheduling

我已经使用 ortools 优化库为 python 构建了作业车间调度算法。 问题是当我制作一个带有设置时间的灵活作业车间模型时它不起作用,我认为这是由于我制作的弧线,如果这里有人可以解释更多电路约束,那会对我有所帮助。顺便说一下,当我使用一台机器时它可以工作。

代码:

from __future__ import print_function

import collections

from google.protobuf import text_format
from ortools.sat.python import cp_model

# Import Python wrapper for or-tools CP-SAT solver.
from ortools.sat.python import cp_model

# Intermediate solution printer
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
    """Print intermediate solutions."""

    def __init__(self):
        cp_model.CpSolverSolutionCallback.__init__(self)
        self.__solution_count = 0

    def on_solution_callback(self):
        """Called after each new solution found."""
        print('Solution %i, time = %f s, objective = %i' %
              (self.__solution_count, self.WallTime(), self.ObjectiveValue()))
        self.__solution_count += 1

def MinimalJobshopSat():
    """Minimal jobshop problem."""
    # Create the model.
    model = cp_model.CpModel()

    jobs_data = [[(0, 2546), (1, 2000), (2, 1400)],
 [(0, 1289), (1, 2546), (2, 2546)],
 [(0, 2839), (1, 1576), (2, 1200)]
]

    setup_times = [
        [
        [3559, 1638, 2000],
        [1442, 3010, 1641],
        [1728, 3583, 3243]],
        [
        [3559, 1638, 2000],
        [1442, 3010, 1641],
        [1728, 3583, 3243]],
        [
        [3559, 1638, 2000],
        [1442, 3010, 1641],
        [1728, 3583, 3243]]
        ]

    all_jobs = range(len(jobs_data))   

    machines_count = 1 + max(task[0] for job in jobs_data for task in job)
    all_machines = range(machines_count)

    for machine in all_machines:
        for job_id in all_jobs:
            min_incoming_setup = min(
                setup_times[machine][j][job_id] for j in all_jobs)
            if min_incoming_setup == 0:
                continue

            print('job %i at machine %i has a min incoming setup of %i' %
                  (job_id, machine, min_incoming_setup))
            # We can transfer some setup times to the duration of the job.
            jobs_data[job_id][machine] = (machine, jobs_data[job_id][machine][1] + min_incoming_setup)
            # Decrease corresponding incoming setup times.
            for j in all_jobs:
                setup_times[machine][j][job_id] -= min_incoming_setup

    # Computes horizon dynamically as the sum of all durations.
    horizon = sum(task[1] for job in jobs_data for task in job)

    for times in setup_times:
        for  time in times:
            horizon += max(time)

    # Named tuple to store information about created variables.
    task_type = collections.namedtuple('task_type', 'start end interval')
    # Named tuple to manipulate solution information.
    assigned_task_type = collections.namedtuple('assigned_task_type',
                                                'start job index duration')

    # Creates job intervals and add to the corresponding machine lists.
    all_tasks = {}
    machine_to_intervals = collections.defaultdict(list)
    starts = collections.defaultdict(list)
    ends = collections.defaultdict(list)

    for job_id, job in enumerate(jobs_data):
        for task_id, task in enumerate(job):
            machine = task[0]
            duration = task[1]
            suffix = '_%i_%i' % (job_id, task_id)
            start_var = model.NewIntVar(0, horizon, 'start' + suffix)
            end_var = model.NewIntVar(0, horizon, 'end' + suffix)
            interval_var = model.NewIntervalVar(start_var, duration, end_var,
                                                'interval' + suffix)
            all_tasks[job_id, task_id] = task_type(
                start=start_var, end=end_var, interval=interval_var)
            machine_to_intervals[machine].append(interval_var)
            starts[machine].append(start_var)
            ends[machine].append(end_var)

    # Create and add disjunctive constraints.
    for machine in all_machines:   
        model.AddNoOverlap(machine_to_intervals[machine])
    #----------------------------------------------------------------------------
    # Transition times using a circuit constraint.
    list_arcs = []
    for machine in all_machines:
        arcs = []
        for i in all_jobs:
            # Initial arc from the dummy node (0) to a task.
            start_lit = model.NewBoolVar('')
            arcs.append([0, i + 1, start_lit])
            # If this task is the first, set to minimum starting time.
            min_start_time = min(0,setup_times[machine][0][i])
            model.Add(starts[machine][i] == min_start_time).OnlyEnforceIf(start_lit)
            # Final arc from an arc to the dummy node.
            arcs.append([i + 1, 0, model.NewBoolVar('')])

            for j in all_jobs:
                if i == j:
                    continue

                lit = model.NewBoolVar('%i_%i follows %i_%i' % (j, machine, i, machine))
                arcs.append([i + 1, j + 1, lit])

                # We add the reified precedence to link the literal with the times of the
                # two tasks.
                # If release_dates[j] == 0, we can strenghten this precedence into an
                # equality as we are minimizing the makespan.

                model.Add(starts[machine][j] >=
                          ends[machine][i] + setup_times[machine][i][j]).OnlyEnforceIf(lit)
        list_arcs.append(arcs)
        model.AddCircuit(arcs)

    #----------------------------------------------------------------------------

    # Precedences inside a job.
    for job_id, job in enumerate(jobs_data):
        for task_id in range(len(job) - 1):
            model.Add(all_tasks[job_id, task_id +
                                1].start >= all_tasks[job_id, task_id].end)

    # Makespan objective.
    obj_var = model.NewIntVar(0, horizon, 'makespan')
    model.AddMaxEquality(obj_var, [
        all_tasks[job_id, len(job) - 1].end
        for job_id, job in enumerate(jobs_data)
    ])
    model.Minimize(obj_var)

    # Solve model.
    solver = cp_model.CpSolver()
    solver.parameters.max_time_in_seconds = 60
    status=solver.Solve(model)
    solver.parameters
    solution_printer = SolutionPrinter()
    solver.SolveWithSolutionCallback(model, solution_printer)
    print(solver.ResponseStats())


    if status == cp_model.FEASIBLE:
        # Create one list of assigned tasks per machine.
        assigned_jobs = collections.defaultdict(list)
        for job_id, job in enumerate(jobs_data):
            for task_id, task in enumerate(job):
                machine = task[0]
                assigned_jobs[machine].append(
                    assigned_task_type(
                        start=solver.Value(all_tasks[job_id, task_id].start),
                        job=job_id,
                        index=task_id,
                        duration=task[1]))

        # Create per machine output lines.
        output = ''
        for machine in all_machines:
            # Sort by starting time.
            assigned_jobs[machine].sort()
            sol_line_tasks = 'Machine ' + str(machine) + ': '
            sol_line = '           '

            for assigned_task in assigned_jobs[machine]:
                name = 'job_%i_%i' % (assigned_task.job, assigned_task.index)
                # Add spaces to output to align columns.
                sol_line_tasks += '%-10s' % name

                start = assigned_task.start
                duration = assigned_task.duration
                sol_tmp = '[%i,%i]' % (start, start + duration)
                # Add spaces to output to align columns.
                sol_line += '%-10s' % sol_tmp

            sol_line += '\n'
            sol_line_tasks += '\n'
            output += sol_line_tasks
            output += sol_line

        # Finally print the solution found.
        print('Optimal Schedule Length: %i' % solver.ObjectiveValue())
        print(output)
if __name__ == '__main__':
    MinimalJobshopSat()()

如果任务是可选的,则需要在该弧对应的节点上添加一个自循环弧。

所以让我们假设 task_i 具有布尔存在文字 lit_i,您需要添加

arcs.append([i + 1, i + 1, lit_i.Not()])