Python 进程等待
Python process wait
我有一个跨多个设备共享工作并在它们上启动进程的应用程序,它们经常需要相互同步。
- Web 应用打开到服务器的 SSH 连接。
- 每个服务器启动 n Python 个实例。
- 然后关闭 SSH 连接。
之后,每个代理唯一的通信方式就是使用 MQTT(发布/订阅主题)。我使用的 MQTT 库是 Paho-MQTT。
我目前的问题是我找不到让他们等待的好方法。我觉得 while true : sleep
不是一个好方法,但我不知道如何做得更好。
我当前的代码如下所示:
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token += 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
到目前为止,我唯一的想法是通过二分法找到最佳等待延迟,但这会很耗时,所以我希望有更好的解决方案。
注:
def wait():
while self.wake_up_token == 0:
pass
是一个糟糕的解决方案,因为进程将不断竞争以检查条件,这会使系统慢数千倍。
我猜的最小示例:(1 个代理,1 个调度程序)
from paho.mqtt.client import Client
from time import sleep
class Agent:
def __init__(self, broker_ip: str, client_id: str):
self.client: Client = Client(client_id=client_id)
self.client.username_pw_set(username="something", password="veryverysecured")
self.client.connect(host=broker_ip)
self.client.subscribe("wake_up_topic")
self.client.message_callback_add("wake_up_topic", self.wake_up)
self.client.loop_start()
self.wake_up_token = 0
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token += 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
def run(self):
while True:
# do something
self.client.publish(topic="agent/action_done", message="")
self.wait()
# do something
class Scheduler:
def __init__(self, broker_ip: str):
self.client: Client = Client(client_id="scheduler")
self.client.username_pw_set(username="something", password="veryverysecured")
self.client.connect(host=broker_ip)
self.client.subscribe("agent/action_done")
self.client.message_callback_add("agent/action_done", self.wake_up)
self.client.loop_start()
self.wake_up_token = 0
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token += 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
def run(self):
while True:
#agents are doing their thing
self.wait()
self.client.publish(topic="wake_up_topic", message="")
我不知道它是否有用,但整个项目的目标是构建一个 MAS 跨多个服务器共享代理的框架。
我认为标准 Semaphore 在这种情况下最有用。
我无法对此进行测试,因为我没有 MQTT client/server 运行,但它应该是这样的:
import threading
class Scheduler:
def __init__(self, broker_ip: str):
# your init code here (unchanged), but add this line:
self.semaphore = threading.Semaphore(0)
def wake_up(self, client, userdata, message) -> None:
self.semaphore.release()
def wait(self) -> None:
self.semaphore.acquire()
我有一个跨多个设备共享工作并在它们上启动进程的应用程序,它们经常需要相互同步。
- Web 应用打开到服务器的 SSH 连接。
- 每个服务器启动 n Python 个实例。
- 然后关闭 SSH 连接。
之后,每个代理唯一的通信方式就是使用 MQTT(发布/订阅主题)。我使用的 MQTT 库是 Paho-MQTT。
我目前的问题是我找不到让他们等待的好方法。我觉得 while true : sleep
不是一个好方法,但我不知道如何做得更好。
我当前的代码如下所示:
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token += 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
到目前为止,我唯一的想法是通过二分法找到最佳等待延迟,但这会很耗时,所以我希望有更好的解决方案。
注:
def wait():
while self.wake_up_token == 0:
pass
是一个糟糕的解决方案,因为进程将不断竞争以检查条件,这会使系统慢数千倍。
我猜的最小示例:(1 个代理,1 个调度程序)
from paho.mqtt.client import Client
from time import sleep
class Agent:
def __init__(self, broker_ip: str, client_id: str):
self.client: Client = Client(client_id=client_id)
self.client.username_pw_set(username="something", password="veryverysecured")
self.client.connect(host=broker_ip)
self.client.subscribe("wake_up_topic")
self.client.message_callback_add("wake_up_topic", self.wake_up)
self.client.loop_start()
self.wake_up_token = 0
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token += 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
def run(self):
while True:
# do something
self.client.publish(topic="agent/action_done", message="")
self.wait()
# do something
class Scheduler:
def __init__(self, broker_ip: str):
self.client: Client = Client(client_id="scheduler")
self.client.username_pw_set(username="something", password="veryverysecured")
self.client.connect(host=broker_ip)
self.client.subscribe("agent/action_done")
self.client.message_callback_add("agent/action_done", self.wake_up)
self.client.loop_start()
self.wake_up_token = 0
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token += 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
def run(self):
while True:
#agents are doing their thing
self.wait()
self.client.publish(topic="wake_up_topic", message="")
我不知道它是否有用,但整个项目的目标是构建一个 MAS 跨多个服务器共享代理的框架。
我认为标准 Semaphore 在这种情况下最有用。 我无法对此进行测试,因为我没有 MQTT client/server 运行,但它应该是这样的:
import threading
class Scheduler:
def __init__(self, broker_ip: str):
# your init code here (unchanged), but add this line:
self.semaphore = threading.Semaphore(0)
def wake_up(self, client, userdata, message) -> None:
self.semaphore.release()
def wait(self) -> None:
self.semaphore.acquire()