简单的资源可用性计划

simpy resource availability schedule

我在 Python 中使用 SimPy 创建一个离散事件模拟,它需要根据用户在 csv 文件中输入的计划提供资源。目的是表示一天中不同时间可用的相同资源(例如员工)的不同数量。据我所知,这不是基本 SimPy 中可用的东西——比如资源优先级。

我已经设法让它工作,并包含下面的代码来展示如何。但是我想问问社区是否有更好的方法在 SimPy 中实现这个功能?

下面的代码的工作原理是在每天开始时在资源不应该可用的时间请求资源 - 具有更高的优先级以确保他们获得资源。然后在适当的时间释放资源以供其他 events/processes 使用。正如我所说,它有效但似乎很浪费大量虚拟进程来确保资源的正确真实可用性。欢迎任何有助于改进的意见。

所以 csv 看起来像:

Number  time
0        23
50       22
100      17
50       10
20       8
5        6

其中数字表示在指定时间可用的员工数量。例如:6 点到 8 点有 5 名员工,8 点到 10 点有 20 名员工,10 点到 17 点有 50 名员工,依此类推,直到一天结束。

代码:

import csv
import simpy

# empty list ready to hold the input data in the csv
input_list = []

# a dummy process that "uses" staff until the end of the current day
def take_res():
    req = staff.request(priority=-100)
    yield req  # Request a staff resource at set priority
    yield test_env.timeout(24 - test_env.now)

# A dummy process that "uses" staff for the time those staff should not 
# be available for the real processes     
def request_res(delay, avail_time):
    req = staff.request(priority=-100)
    yield req  # Request a staff resource at set priority
    yield test_env.timeout(delay)
    yield staff.release(req)
    # pass time it is avail for
    yield test_env.timeout(avail_time)
    test_env.process(take_res())

# used to print current levels of resource usage
def print_usage():
    print('At time %0.2f %d res are in use' % (test_env.now, staff.count))
    yield test_env.timeout(0.5)
    test_env.process(print_usage())

# used to open the csv and read the data into a list
with open('staff_schedule.csv', mode="r") as infile:
    reader = csv.reader(infile)
    next(reader, None)  # ignore header
    for row in reader:
        input_list.append(row[:2])

    # calculates the time the current number of resources will be 
    # available for and adds to the list 

    i = 0
    for row in the_list:
        if i == 0:
            row.append(24 - int(input_list[i][1]))
        else:
            row.append(int(input_list[i-1][1]) - int(input_list[i][1]))

        i += 1

    # converts list to tuple of tuples to prevent any accidental 
    # edits from this point in
    staff_tuple = tuple(tuple(row) for row in input_list)
    print(staff_tuple)

# define environment and creates resources   
test_env = simpy.Environment()
staff = simpy.PriorityResource(test_env, capacity=sum(int(l[0]) for l in staff_tuple))

# for each row in the tuple run dummy processes to hold resources 
# according to schedule in the csv
for item in the_tuple:
    print(item[0])
    for i in range(int(item[0])):
        test_env.process(request_res(int(item[1]), int(item[2])))

# run event to print usage over time
test_env.process(print_usage())

# run for 25 hours - so 1 day
test_env.run(until=25)

SimPy相关

也许您可以使用 PreemptiveResource(参见 this example)。有了这个,每个资源只需要一个阻塞进程,因为它可以 "kick" 不太重要的进程。

Python相关

  • 记录您的代码。 take_res()request_res() 的目的是什么? (为什么两个函数都使用 priority=-100?)
  • 使用更好的名字。 the_listthe_tuple 不是很有帮助。
  • 您可以 the_list.append(row[:2]).
  • 而不是 the_list.append(row[0], row[1])
  • 为什么要将列表的列表转换为元组的元组?据我所知。但它增加了额外的代码,因此增加了额外的混乱和编程错误的可能性。
  • 你应该尽快离开 with open(file) 块(在你的情况下,在前四行之后)。没有必要让文件打开的时间超过必要的时间,当你完成对所有行的迭代后,你就不再需要它了。

这就是我为我的应用程序解决的方法。它并不完美,但考虑到我对 Python 和 SimPy 的基本技能,它是我能做的最好的。

结果是在所需时间可用的正确顾问数量。

首先,我定义了一个商店并将容量设置为等于模拟中将存在的顾问实例总数。

self.adviser_store = simpy.FilterStore(self.env,
capacity=self.total_ad_instances)

所需的 Adviser class 实例是在初始化步骤中创建的,为简洁起见,我没有包括在内。我实际上使用 JSON 文件来自定义各个顾问实例,然后将其放入列表中。

下面 class 定义中的 运行 参数实际上是另一个 class,它包含与当前 运行 模拟相关的所有信息 - 例如它包含模拟的开始和结束日期。 self.start_date 因此定义顾问开始工作的日期。 self.run.start_date 是模拟的开始日期。

class Adviser(object):

    def __init__(self, run, id_num, start_time, end_time, start_date,  end_date):

    self.env = run.env
    self.run = run
    self.id_num = id_num        
    self.start_time = start_time
    self.end_time = end_time
    self.start_date = datetime.datetime.strptime(start_date, '%Y, %m, %d')
    self.end_date = datetime.datetime.strptime(end_date, '%Y, %m, %d')
    self.ad_type = ad_type

    self.avail = False
    self.run.env.process(self.set_availability())  

正如您所见,创建顾问 class 也会启动设置可用性的过程。在下面的示例中,我将其简化为在给定日期范围内每天设置相同的可用性。您当然可以根据 date/day 等设置不同的可用性

def set_availability(self):

    start_delay = self.start_time + (self.start_date - self.run.start_date).total_seconds()/3600 # this returns the time in hours until the resource becomes available and is applied below.
    end_delay = self.end_time + (self.start_date - self.run.start_date).total_seconds()/3600
    repeat = (self.end_date - self.start_date).days + 1  # defines how man days to repeat it for

    for i in range(repeat):

        start_delayed(self.run.env, self.add_to_store(), start_delay)
        start_delayed(self.run.env, self.remove_from_store(), end_delay)
        start_delay += 24
        end_delay += 24

    yield self.run.env.timeout(0)


def add_to_store(self):

    self.run.ad_avail.remove(self)  # take adviser from a list
    self.run.adviser_store.put(self)  # and put it in the store
    yield self.run.env.timeout(0)

def remove_from_store(self):        

    current_ad = yield self.run.adviser_store.get(lambda item: item.id_num == self.id_num)  # get itself from the store 
    self.run.ad_avail.append(current_ad) # and put it back in the list
    yield self.run.env.timeout(0)

所以本质上,客户只能向商店请求顾问,并且顾问只会在特定时间出现在商店中。其余时间它们在附加到当前模拟的列表中。运行。

我觉得这里还是有坑的。当顾问对象不可用时,它可能正在使用中。我还没有注意到这是否会发生,或者如果发生会产生什么影响。

我尝试了其他方法,我重载了资源 class,只添加了一个方法,虽然我不完全理解源代码,但它似乎可以正常工作。您可以告诉资源更改模拟中某处的容量。

from simpy.resources.resource import Resource, Request, Release
from simpy.core import BoundClass
from simpy.resources.base import BaseResource

class VariableResource(BaseResource):
    def __init__(self, env, capacity):
        super(VariableResource, self).__init__(env, capacity)
        self.users = []
        self.queue = self.put_queue

    @property
    def count(self):
        return len(self.users)

    request = BoundClass(Request)
    release = BoundClass(Release)

    def _do_put(self, event):
        if len(self.users) < self.capacity:
            self.users.append(event)
            event.usage_since = self._env.now
            event.succeed()

    def _do_get(self, event):
        try:
            self.users.remove(event.request)
        except ValueError:
            pass
        event.succeed()

    def _change_capacity(self, capacity):
        self._capacity = capacity

我认为这应该有效,但我对触发器的工作方式不是 100% 有信心。

我解决了每次 window 创建资源的问题。每个到达都在函数服务中处理,每个客户将根据到达时间分配给一个资源。如果客户必须在队列中等待并且必须 re-asigned 到下一个时间 window,则将其从当前资源中移除并 re-assigned 到下一个资源。这是通过将请求修改为:

with self.Morning.request() as req1:
    yield req1 | self.env.timeout(self.durationMorning)

代码:

import simpy
import numpy as np
import itertools

class Queue():
    def __init__(self, env, N_m, N_e):
        
        self.Arrival = {}
        self.StartService = {}
        self.FinishService = {}
        
        self.Morning = simpy.Resource(env, N_m)
        self.Evening = simpy.Resource(env, N_e)
        self.env = env
                
        self.durationMorning = 30
    
    #arrivals/second
    def t_arrival(self,t):
        if t<self.durationMorning:
            return 1
        else:
            return 2

    def t_service(self):
        return 5

    def service(self,i):
        arrival_time = self.env.now

        if arrival_time==self.durationMorning:
            yield self.env.timeout(0.0001)
            
        # Add Arrival
        system.Arrival[i] = arrival_time

        
        # Morning shift
        if self.env.now < self.durationMorning:
            
            with self.Morning.request() as req1:
                yield req1 | self.env.timeout(self.durationMorning)
                if self.env.now < self.durationMorning:
                    
                    system.StartService[i] = self.env.now
                    yield self.env.timeout(self.t_service())
                    
                    print(f'{i} arrived at {self.Arrival[i]} done at {self.env.now} by 1')
                    self.FinishService[i] = self.env.now
                    

        # Evening shift
        if (self.env.now >= self.durationMorning) & (i not in self.FinishService):
            

            with self.Evening.request() as req2:
                yield req2
                system.StartService[i] = self.env.now
                yield self.env.timeout(self.t_service())
                print(f'{i} arrived at {self.Arrival[i]} done at {self.env.now} by 2')
    

                self.FinishService[i] = self.env.now
        
    def arrivals(self):
        for i in itertools.count():
            self.env.process(self.service(i))
            t = self.t_arrival(self.env.now)
            yield self.env.timeout(t)  




env = simpy.Environment()
system = Queue(env, N_morning, N_evening)
system.env.process(system.arrivals())
system.env.run(until=60)



0 arrived at 0 done at 5 by 1
1 arrived at 1 done at 6 by 1
2 arrived at 2 done at 10 by 1
3 arrived at 3 done at 11 by 1
4 arrived at 4 done at 15 by 1
5 arrived at 5 done at 16 by 1
6 arrived at 6 done at 20 by 1
7 arrived at 7 done at 21 by 1
8 arrived at 8 done at 25 by 1
9 arrived at 9 done at 26 by 1
10 arrived at 10 done at 30 by 1
11 arrived at 11 done at 31 by 1
12 arrived at 12 done at 35 by 2
13 arrived at 13 done at 40 by 2
14 arrived at 14 done at 45 by 2
15 arrived at 15 done at 50 by 2
16 arrived at 16 done at 55 by 2