车间调度问题

一个常见的调度问题是作业车间调度问题,多个作业在多台机器上处理。每个作业由一系列任务组成,这些任务必须按照给定的顺序执行,并且每个任务必须在特定的机器上处理。例如,作业可以是制造单一的消费品,如汽车。问题是如何安排机器上的任务,以最小化调度的长度,即所有任务完成所需的时间。

作业车间问题由几个约束条件:

  • 在作业的前一个任务完成之前,作业的任何任务不能启动
  • 一台机器一次自能做一项任务
  • 一项任务,一旦开始,必须运行到完成

例子

下面是一个作业车间调度问题的简单例子,其中每个作业都用一对数字(m, p)来表示,其中m是必须处理该任务的机器编号,p是该任务的处理时间。(作业和机器的编号从0开始)

  • job 0 = [(0, 3), (1, 2), (2, 2)]
  • job 1 = [(0, 2), (2, 1), (1, 4)]
  • job 2 = [(1, 4), (2, 3)]

在本例中,作业0有三个任务,第一个(0,3)表示必须在机器0上在3个单位时间内处理完成。第二个(1,2)表示必须在机器1上在2个单位时间内处理完成……这里总共有8个任务。

解决方案

这个作业车间问题的一个解决方案是为每个任务分配一个开始时间,满足上面给出的约束条件。下面的图表显示了这个问题的一个可行的解决方案:

schedule1

可以检查每个作业的任务是否按照问题给出的顺序以不重叠的时间间隔调度。

这个解决方案的时间长度是12,这是所有三个作业都完成的时间。然而,你可以在下面看到,这并不是这个问题的最佳解决方案。

变量和约束

本节介绍如何设置问题的变量和约束。首先,task(i, j)表示job i中第j个任务。比如,task(0,2)代表job 0中第二个任务,对应问题描述中的(1, 2)。

接下来,定义 $t{i,j}$为task(i,j)的开始时间, $t{i,j}$是作业车间问题中的变量。找到解决方案包括确定这些变量的值,以满足问题的需求。

作业调度问题有两种约束:

  • 优先约束——这些约束来自于这样的条件:在同一个作业中,任何两个连续的任务必须在第二个任务开始之前完成第一个任务。例如,task(0, 2)和task(0, 3)是job 0的连续任务。由于task(0, 2)的处理时间是2,task(0, 3)的开始时间必须在task2开始时间之后至少2个单位时间(也许task2是粉刷一扇门,油漆需要两个小时才能变干)。所以,你得到以下约束:
  • 没有重叠约束——这是由于一台机器不能同时处理两个任务的限制产生的。例如,task(0, 2)和task(2, 1)都在机器1上处理。由于它们的处理时间分别为2和4,因此,必须保持下面的约束:

如果task(0, 2)在task(2, 1)前调度

或者

如果task(2, 1)在task(0, 2)前调度

问题目标

作业车间调度问题的目标是最小化最大完工时间,即从作业最早开始时间到最后结束时间的时间长度。

代码

下面的部分描述了解决作业车间问题的代码主要部分。

导入依赖库

下面的代码导入需要的依赖库

1
2
import collections
from ortools.sat.python import cp_model

定义数据

接下来,定义问题的数据

1
2
3
4
5
6
7
8
9
10
jobs_data = [  # task = (machine_id, processing_time).
[(0, 3), (1, 2), (2, 2)], # Job0
[(0, 2), (2, 1), (1, 4)], # Job1
[(1, 4), (2, 3)] # Job2
]

machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)
# 计算所有持续时间的总和
horizon = sum(task[1] for job in jobs_data for task in job)

模型创建

接下来创建问题的模型

1
model = cp_model.CpModel()

变量定义

接下来定义问题的变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 存储创建变量的信息
task_type = collections.namedtuple('task_type', 'start end interval')
# 解决方案信息
assigned_task_type = collections.namedtuple('assigned_task_type',
'start job index duration')

# 创建job时间间隔变量,加上机器列表
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)

for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
duration = task[1]
suffix = '_%i_%i' % (job_id, task_id)
start_var = model.NewIntVar(0, horizon, 'start' + suffix)
end_var = model.NewIntVar(0, horizon, 'end' + suffix)
interval_var = model.NewIntervalVar(start_var, duration, end_var,
'interval' + suffix)
all_tasks[job_id, task_id] = task_type(start=start_var,
end=end_var,
interval=interval_var)
machine_to_intervals[machine].append(interval_var)

对于每个job和task,使用求解器的NewIntVar来创建变量:

  • start_var: task开始时间
  • end_var: task结束时间
    start_varend_var的上界是horizon,所有job所有task处理时间的总和。horizon足够大,可以完成所有的任务,原因在于:如果你在非重叠时间间隔安排任务(一个非最优的解决方案),调度的时间长度正好是horizon。所以最优解的时间长度不能大于horizon

接下来,使用NewIntervalVar方法为task创建interval variable——它的值是一个可变的时间间隔。NewIntervalVar输入如下:

  • start_var:task开始时间变量
  • duration:task时间间隔长度
  • end_var:task结束时间间隔变量
  • 'interval_%i_%i' % (job, task_id)):这个变量的名字

在任意解决方案中,end_var减去start_var必须等于duration

约束定义

接下来定义问题的约束。

1
2
3
4
5
6
7
8
9
# 创建无重叠约束
for machine in all_machines:
model.AddNoOverlap(machine_to_intervals[machine])

# job内部的优先顺序
for job_id, job in enumerate(jobs_data):
for task_id in range(len(job) - 1):
model.Add(all_tasks[job_id, task_id +
1].start >= all_tasks[job_id, task_id].end)

程序中使用求解器的AddNoOverlap方法来创建无重叠约束,从而防止同一台机器的任务在时间上重叠。

接下来,程序添加了优先约束,以防止同一job的连续任务在时间上重叠。对每一个job,下面这行代码

1
model.Add(all_tasks[job, task_id + 1].start >= all_tasks[job, task_id].end)

要求一个task的结束时间必须发生在下一个task的开始时间前。

定义优化目标

下面的代码定义这个问题的优化目标。

1
2
3
4
5
6
7
# 最大完工时间
obj_var = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(obj_var, [
all_tasks[job_id, len(job) - 1].end
for job_id, job in enumerate(jobs_data)
])
model.Minimize(obj_var)

这个表达式

1
2
3
model.AddMaxEquality(
obj_var,
[all_tasks[(job, len(jobs_data[job]) - 1)].end for job in all_jobs])

创建变量obj_var,值代表所有job的最大结束时间。

调用求解器

下面的代码调用求解器。

1
2
solver = cp_model.CpSolver()
status = solver.Solve(model)

输出结果

下面的代码显示结果,包括最佳计划和task间隔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('Solution:')
# 每个机器创建一个分配task的列表
assigned_jobs = collections.defaultdict(list)
for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
assigned_jobs[machine].append(
assigned_task_type(start=solver.Value(
all_tasks[job_id, task_id].start),
job=job_id,
index=task_id,
duration=task[1]))

# 每个机器的输出
output = ''
for machine in all_machines:
# 开始时间排序
assigned_jobs[machine].sort()
sol_line_tasks = 'Machine ' + str(machine) + ': '
sol_line = ' '

for assigned_task in assigned_jobs[machine]:
name = 'job_%i_task_%i' % (assigned_task.job,
assigned_task.index)
# 增加空格
sol_line_tasks += '%-15s' % name

start = assigned_task.start
duration = assigned_task.duration
sol_tmp = '[%i,%i]' % (start, start + duration)
# 增加空格
sol_line += '%-15s' % sol_tmp

sol_line += '\n'
sol_line_tasks += '\n'
output += sol_line_tasks
output += sol_line

# 输出找到的解决方案
print(f'Optimal Schedule Length: {solver.ObjectiveValue()}')
print(output)
else:
print('No solution found.')

最优调度如下所示:

1
2
3
4
5
6
7
Optimal Schedule Length: 11
Machine 0: job_0_0 job_1_0
[0,3] [3,5]
Machine 1: job_2_0 job_0_1 job_1_2
[0,4] [4,6] [7,11]
Machine 2: job_1_1 job_0_2 job_2_1
[5,6] [6,8] [8,11]

schedule2

眼尖的读者在检查机器1的时候可能会想,为什么job_1_2被安排在时间7而不是时间6。两个都是有效的解决方案,但请记住:目标式最小化完工时间。把job_1_2提前不会减少最大完工时间,因此从求解器的角度,这两个解决方案是相等的。

完整程序

最后,下面是这个作业车间调度问题的完整程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Minimal jobshop example."""
import collections
from ortools.sat.python import cp_model


def main():
"""Minimal jobshop problem."""
# 数据
jobs_data = [ # task = (machine_id, processing_time).
[(0, 3), (1, 2), (2, 2)], # Job0
[(0, 2), (2, 1), (1, 4)], # Job1
[(1, 4), (2, 3)] # Job2
]

machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)
# 计算所有持续时间的总和
horizon = sum(task[1] for job in jobs_data for task in job)

# 创建模型
model = cp_model.CpModel()

# 存储创建变量的信息
task_type = collections.namedtuple('task_type', 'start end interval')
# 解决方案信息
assigned_task_type = collections.namedtuple('assigned_task_type',
'start job index duration')

# 创建job时间间隔变量,加上机器列表
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)

for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
duration = task[1]
suffix = '_%i_%i' % (job_id, task_id)
start_var = model.NewIntVar(0, horizon, 'start' + suffix)
end_var = model.NewIntVar(0, horizon, 'end' + suffix)
interval_var = model.NewIntervalVar(start_var, duration, end_var,
'interval' + suffix)
all_tasks[job_id, task_id] = task_type(start=start_var,
end=end_var,
interval=interval_var)
machine_to_intervals[machine].append(interval_var)

# 创建无重叠约束
for machine in all_machines:
model.AddNoOverlap(machine_to_intervals[machine])

# job内部的优先顺序
for job_id, job in enumerate(jobs_data):
for task_id in range(len(job) - 1):
model.Add(all_tasks[job_id, task_id +
1].start >= all_tasks[job_id, task_id].end)

# 最大完工时间
obj_var = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(obj_var, [
all_tasks[job_id, len(job) - 1].end
for job_id, job in enumerate(jobs_data)
])
model.Minimize(obj_var)

# 创建求解器并求解
solver = cp_model.CpSolver()
status = solver.Solve(model)

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('Solution:')
# Create one list of assigned tasks per machine.
assigned_jobs = collections.defaultdict(list)
for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
assigned_jobs[machine].append(
assigned_task_type(start=solver.Value(
all_tasks[job_id, task_id].start),
job=job_id,
index=task_id,
duration=task[1]))

# 每个机器创建一个分配task的列表
output = ''
for machine in all_machines:
# Sort by starting time.
assigned_jobs[machine].sort()
sol_line_tasks = 'Machine ' + str(machine) + ': '
sol_line = ' '

for assigned_task in assigned_jobs[machine]:
name = 'job_%i_task_%i' % (assigned_task.job,
assigned_task.index)
# 增加空格
sol_line_tasks += '%-15s' % name

start = assigned_task.start
duration = assigned_task.duration
sol_tmp = '[%i,%i]' % (start, start + duration)
# 增加空格
sol_line += '%-15s' % sol_tmp

sol_line += '\n'
sol_line_tasks += '\n'
output += sol_line_tasks
output += sol_line

# 输出找到的解决方案
print(f'Optimal Schedule Length: {solver.ObjectiveValue()}')
print(output)
else:
print('No solution found.')

# 数据统计
print('\nStatistics')
print(' - conflicts: %i' % solver.NumConflicts())
print(' - branches : %i' % solver.NumBranches())
print(' - wall time: %f s' % solver.WallTime())


if __name__ == '__main__':
main()

您的支持将鼓励我继续创作!