Simpy - 如何让项目在调用 get 方法之前等待任意时间?

Simpy - How to let item wait for some arbitrary time before get method called?

我想模拟一种通信过程,源端不断产生消息发送给接收端,而消息要经过一段距离才能到达目的地。然而,距离可能会随着时间而改变,因此每条消息在电缆中的持续时间是不一样的。我的第一种方法是使用 env.timeout(t_i),其中 t_i 是电缆中第 i 条消息的时间。我的代码看起来像这样,

import simpy
import random

class network:

    def __init__(self) -> None:
        self.env = simpy.Environment()
        self.cable = simpy.Store(self.env)

    def generateMessage(self):
        
        i = 0
        while True:

            message = (f'message: {i}', f'travel time {0.1 + round(random.random(), 3)}', (round(random.random(), 3)))
            i += 1
            print(f'gen at: {self.env.now} {message}')
            yield self.env.timeout(0.1)
            self.cable.put(message)

    def receieveMessage(self):

        while True:

            # wait for message to travel in the cable for 'sometime' before arrive
            
            message = yield self.cable.get()
            yield self.env.timeout(message[2]) #<- This is not the right approach

            print(f'receieve at {self.env.now}, message: {message[:2]}')


    def run(self):

        self.env.process(self.generateMessage())
        self.env.process(self.receieveMessage())

        self.env.run(until=1)

q = network()
q.run()

输出在哪里

gen at: 0 ('message: 0', 'travel time 0.22', 0.22)
gen at: 0.1 ('message: 1', 'travel time 0.71', 0.71)
gen at: 0.2 ('message: 2', 'travel time 0.377', 0.377)
gen at: 0.30000000000000004 ('message: 3', 'travel time 1.056', 1.056)
receieve at 0.32, message: ('message: 0', 'travel time 0.22')
gen at: 0.4 ('message: 4', 'travel time 0.138', 0.138)
gen at: 0.5 ('message: 5', 'travel time 0.45299999999999996', 0.45299999999999996)
gen at: 0.6 ('message: 6', 'travel time 1.024', 1.024)
gen at: 0.7 ('message: 7', 'travel time 0.695', 0.695)
gen at: 0.7999999999999999 ('message: 8', 'travel time 0.503', 0.503)
gen at: 0.8999999999999999 ('message: 9', 'travel time 0.702', 0.702)
gen at: 0.9999999999999999 ('message: 10', 'travel time 0.75', 0.75)

这不是我想要的,因为在这种方法中,receieveMessage 方法的每个循环在下一次迭代之前等待 travel_time,而不是在获取之前等待 travel_time 的消息延迟通过 receieveMessage 方法。

期望的输出应该是这样的,

gen at: 0 ('message: 0', 'travel time 0.207')
gen at: 0.1 ('message: 1', 'travel time 0.735')
receieve at 0.207, message: ('message: 0', 'travel time 0.207')
gen at: 0.2 ('message: 2', 'travel time 0.498')
gen at: 0.3 ('message: 3', 'travel time 0.492')
gen at: 0.4 ('message: 4', 'travel time 0.864')
gen at: 0.5 ('message: 5', 'travel time 0.241')
gen at: 0.6 ('message: 6', 'travel time 0.505')
receieve at 0.698, message: ('message: 2', 'travel time 0.498')
gen at: 0.7 ('message: 7', 'travel time 0.76')
receieve at 0.741, message: ('message: 5', 'travel time 0.241')
receieve at 0.792, message: ('message: 3', 'travel time 0.492')
gen at: 0.8 ('message: 8', 'travel time 0.815')
receieve at 0.835, message: ('message: 1', 'travel time 0.735')
gen at: 0.9 ('message: 9', 'travel time 0.104')
gen at: 1 ('message: 10', 'travel time 0.524')

# Message of generated message which should not print out be cause the util arg.
receieve at 1.004, message: ('message: 9', 'travel time 0.104')
receieve at 1.105, message: ('message: 6', 'travel time 0.505')
receieve at 1.264, message: ('message: 4', 'travel time 0.864')
receieve at 1.46, message: ('message: 7', 'travel time 0.76')
receieve at 1.524, message: ('message: 10', 'travel time 0.524')
receieve at 1.615, message: ('message: 8', 'travel time 0.815')


我该如何实现?我正在考虑为每条消息引入另一个过程,但这似乎只能是一个想法。

我认为您真的不需要 simpy。我做了很多这样的模拟,没有任何不尊重的意思,我通常发现自己动手更容易。

import random

class network:

    def __init__(self) -> None:
        self.time = 0
        self.msg = 0
        self.cable = []

    def run(self):
        # Fill.
        while self.time < 1.0:
            self.checkAlarms()
            self.generateMessage()
            self.time += 0.1
        # Drain.
        while self.cable:
            self.checkAlarms()
            self.time += 0.1

    def checkAlarms(self):
        self.cable.sort()
        newlist = []
        for t,msg in self.cable:
            if t < self.time:
                self.receiveMessage( t, msg )
            else:
                newlist.append( (t,msg) )
        self.cable = newlist

    def generateMessage(self):
        travel = random.random() + 0.1
        message = (f'message: {self.msg}', f'travel time {travel:.3f}')
        self.msg += 1
        print(f'gen at: {self.time:.3f} {message}')
        self.cable.append( (self.time+travel, message ) )

    def receiveMessage(self, t, message):
        print(f'receive at {t:.3f}, message: {message}')


q = network()
q.run()

输出:

C:\tmp>python x.py                                                
gen at: 0.000 ('message: 0', 'travel time 0.635')
gen at: 0.100 ('message: 1', 'travel time 1.088')
gen at: 0.200 ('message: 2', 'travel time 0.945')
gen at: 0.300 ('message: 3', 'travel time 0.424')
gen at: 0.400 ('message: 4', 'travel time 0.321')
gen at: 0.500 ('message: 5', 'travel time 0.626')
gen at: 0.600 ('message: 6', 'travel time 0.660')
receive at 0.635, message: ('message: 0', 'travel time 0.635')
gen at: 0.700 ('message: 7', 'travel time 0.288')
receive at 0.721, message: ('message: 4', 'travel time 0.321')
receive at 0.724, message: ('message: 3', 'travel time 0.424')
gen at: 0.800 ('message: 8', 'travel time 1.031')
gen at: 0.900 ('message: 9', 'travel time 0.443')
receive at 0.988, message: ('message: 7', 'travel time 0.288')
gen at: 1.000 ('message: 10', 'travel time 0.468')
receive at 1.126, message: ('message: 5', 'travel time 0.626')
receive at 1.145, message: ('message: 2', 'travel time 0.945')
receive at 1.188, message: ('message: 1', 'travel time 1.088')
receive at 1.260, message: ('message: 6', 'travel time 0.660')
receive at 1.343, message: ('message: 9', 'travel time 0.443')
receive at 1.468, message: ('message: 10', 'travel time 0.468')
receive at 1.831, message: ('message: 8', 'travel time 1.031')

好的,这是一个简单的答案。这里的关键是你的接收者不是一个进程。消息的接收是事件的结果。因此,创建一个超时事件,并让接收者成为该事件的回调。

import simpy
import random

class network:

    def __init__(self) -> None:
        self.env = simpy.Environment()

    def generateMessage(self):
        
        for i in range(10):
            elapse = random.random() + 0.1
            message = (f'message: {i}', f'travel time {elapse:.3f}')
            print(f'gen at: {self.env.now:.3f} {message}')
            event = self.env.timeout( elapse )
            event.data = message
            event.callbacks.append( self.receiveMessage )
            yield self.env.timeout(0.1)

    def receiveMessage(self, event):
        print(f'receive at {self.env.now:.3f}, message: {event.data}')

    def run(self):
        self.env.process(self.generateMessage())
        self.env.run(until=2)

q = network()
q.run()

输出:

gen at: 0.000 ('message: 0', 'travel time 1.047')
gen at: 0.100 ('message: 1', 'travel time 0.179')
gen at: 0.200 ('message: 2', 'travel time 0.965')
receive at 0.279, message: ('message: 1', 'travel time 0.179')
gen at: 0.300 ('message: 3', 'travel time 0.913')
gen at: 0.400 ('message: 4', 'travel time 0.210')
gen at: 0.500 ('message: 5', 'travel time 0.277')
gen at: 0.600 ('message: 6', 'travel time 0.249')
receive at 0.610, message: ('message: 4', 'travel time 0.210')
gen at: 0.700 ('message: 7', 'travel time 0.941')
receive at 0.777, message: ('message: 5', 'travel time 0.277')
gen at: 0.800 ('message: 8', 'travel time 1.091')
receive at 0.849, message: ('message: 6', 'travel time 0.249')
gen at: 0.900 ('message: 9', 'travel time 0.968')
receive at 1.047, message: ('message: 0', 'travel time 1.047')
receive at 1.165, message: ('message: 2', 'travel time 0.965')
receive at 1.213, message: ('message: 3', 'travel time 0.913')
receive at 1.641, message: ('message: 7', 'travel time 0.941')
receive at 1.868, message: ('message: 9', 'travel time 0.968')
receive at 1.891, message: ('message: 8', 'travel time 1.091')

我认为如果您一次可以发送多条消息,simpy 会更有用。当您建议创建另一个流程时,我也认为您走在正确的轨道上。

我将生成器与发送进程分开,因此消息可以作为独立进程竞争公共资源(通道)。向上通道查看消息发送时间戳如何重叠

"""
quick sim to send messages over a network chanel

programmer: Michael R. Gibbs
"""

import simpy
import random

class Network():

    def __init__(self, env, num_of_chanels):
        """
        The num_of_chanels limits how many messages
        can be "sending" at one time.
        If all the chanels are busy, then new messages queue up
        for next available chanel
        """
        self.env = env
        self.num_of_chanels = num_of_chanels

        self.chanels = simpy.Resource(env, num_of_chanels)

    def send_mess(self, mess):
        """
        sim process for sending a message
        All the send mess process queue up for a channel resource
        Then with the channel 'sends' the messaage
        The release the channel for the next senm mess process in the queue

        mess is a tuple of (id, message text, send time)
        """

        print(f'{self.env.now}: - message: {mess[0]} being queued')

        with self.chanels.request() as req:
            yield req

            print(f'{self.env.now}: - message: {mess[0]} being sent')

            yield self.env.timeout(mess[2])

            print(f'{self.env.now}: - message: {mess[0]} has been sent {mess[1]}')


def gen_messages(env, network):
    """
    Generates a series of messages with a random send time

    """

    id = 1

    while True:
        # change the randint params to stress the queue more
        yield env.timeout(random.randint(1,3))

        # (id, mess text, send time)
        mess = (id, 'mess ' + str(id), random.randint(1,4))
        id += 1

        # no yield here, just drop and go
        env.process(network.send_mess(mess))

# create and start sim
env = simpy.Environment()
network = Network(env,1) # start with just one chanel

env.process(gen_messages(env, network))
env.run(50)

在思考蒂姆罗伯茨的回答后,我想出了适合我的用例的自己的答案。正如 Michael 指出的那样,为每条消息创建另一个进程也同样有效。

import simpy
import random

class network:

    def __init__(self) -> None:
        self.env = simpy.Environment()
        self.cable = simpy.Store(self.env)

    def generateMessage(self):
        
        i = 0
        while True:

            travel_time = round(random.random(), 3) + 0.1
            message = (f'message: {i}', f'travel time {travel_time:.3f}', travel_time)
            i += 1
            print(f'gen at: {self.env.now:.3f} {message}')
            self.env.process(self.travel(message))
            yield self.env.timeout(0.1)
            

    def receieveMessage(self):

        while True:
            
            message = yield self.cable.get()

            print(f'receieve at {self.env.now:.3f}, message: {message[:2]}')

    def travel(self, message):

        yield self.env.timeout(message[2])
        self.cable.put(message)

    def run(self):

        self.env.process(self.generateMessage())
        self.env.process(self.receieveMessage())

        self.env.run(until=1)

q = network()
q.run()

输出:

gen at: 0.000 ('message: 0', 'travel time 1.068', 1.068)
gen at: 0.100 ('message: 1', 'travel time 0.299', 0.29900000000000004)
gen at: 0.200 ('message: 2', 'travel time 0.228', 0.228)
gen at: 0.300 ('message: 3', 'travel time 0.535', 0.535)
receieve at 0.399, message: ('message: 1', 'travel time 0.299')
gen at: 0.400 ('message: 4', 'travel time 0.778', 0.778)
receieve at 0.428, message: ('message: 2', 'travel time 0.228')
gen at: 0.500 ('message: 5', 'travel time 0.953', 0.953)
gen at: 0.600 ('message: 6', 'travel time 0.344', 0.344)
gen at: 0.700 ('message: 7', 'travel time 0.686', 0.6859999999999999)
gen at: 0.800 ('message: 8', 'travel time 0.476', 0.476)
receieve at 0.835, message: ('message: 3', 'travel time 0.535')
gen at: 0.900 ('message: 9', 'travel time 1.053', 1.053)
receieve at 0.944, message: ('message: 6', 'travel time 0.344')
gen at: 1.000 ('message: 10', 'travel time 0.866', 0.866)