Python 进程等待

Python process wait

我有一个跨多个设备共享工作并在它们上启动进程的应用程序,它们经常需要相互同步。

  1. Web 应用打开到服务器的 SSH 连接。
  2. 每个服务器启动 n Python 个实例。
  3. 然后关闭 SSH 连接。

之后,每个代理唯一的通信方式就是使用 MQTT(发布/订阅主题)。我使用的 MQTT 库是 Paho-MQTT。

我目前的问题是我找不到让他们等待的好方法。我觉得 while true : sleep 不是一个好方法,但我不知道如何做得更好。

我当前的代码如下所示:

def wake_up(self, client, userdata, message) -> None:
   self.wake_up_token += 1

def wait(self) -> None:
   while self.wake_up_token == 0:
       sleep(self.wait_delay)
   self.wake_up_token -= 1

到目前为止,我唯一的想法是通过二分法找到最佳等待延迟,但这会很耗时,所以我希望有更好的解决方案。

注:

def wait():
   while self.wake_up_token == 0:
      pass

是一个糟糕的解决方案,因为进程将不断竞争以检查条件,这会使系统慢数千倍。

我猜的最小示例:(1 个代理,1 个调度程序)

from paho.mqtt.client import Client
from time import sleep

class Agent:

    def __init__(self, broker_ip: str, client_id: str):
        self.client: Client = Client(client_id=client_id)
        self.client.username_pw_set(username="something", password="veryverysecured")
        self.client.connect(host=broker_ip)
        self.client.subscribe("wake_up_topic")
        self.client.message_callback_add("wake_up_topic", self.wake_up)
        self.client.loop_start()

        self.wake_up_token = 0
        
    def wake_up(self, client, userdata, message) -> None:
        self.wake_up_token += 1
    
    def wait(self) -> None:
        while self.wake_up_token == 0:
            sleep(self.wait_delay)
        self.wake_up_token -= 1
    
    def run(self):
        while True:
            # do something
            self.client.publish(topic="agent/action_done", message="")
            self.wait()
            # do something

class Scheduler:

    def __init__(self, broker_ip: str):
        self.client: Client = Client(client_id="scheduler")
        self.client.username_pw_set(username="something", password="veryverysecured")
        self.client.connect(host=broker_ip)
        self.client.subscribe("agent/action_done")
        self.client.message_callback_add("agent/action_done", self.wake_up)
        self.client.loop_start()

        self.wake_up_token = 0

    def wake_up(self, client, userdata, message) -> None:
        self.wake_up_token += 1
    
    def wait(self) -> None:
        while self.wake_up_token == 0:
            sleep(self.wait_delay)
        self.wake_up_token -= 1
    
    def run(self):
        while True:

            #agents are doing their thing

            self.wait()
            self.client.publish(topic="wake_up_topic", message="")

我不知道它是否有用,但整个项目的目标是构建一个 MAS 跨多个服务器共享代理的框架。

我认为标准 Semaphore 在这种情况下最有用。 我无法对此进行测试,因为我没有 MQTT client/server 运行,但它应该是这样的:

import threading

class Scheduler:

    def __init__(self, broker_ip: str):
        # your init code here (unchanged), but add this line:
        self.semaphore = threading.Semaphore(0)

    def wake_up(self, client, userdata, message) -> None:
        self.semaphore.release()
    
    def wait(self) -> None:
        self.semaphore.acquire()