Python3 ZMQ,中断函数并在收到每条新消息时调用另一个函数
Python3 ZMQ, Interrupt function and calling another on each new message received
这是我的问题:我有 2 个程序在任意 tcp 端口上通过 zmq 进行通信。
当#1 收到来自#2 的消息时,他必须调用一些函数。
如果#1 在当前函数结束之前收到消息,我希望#1 中断当前函数并调用新函数。
我尝试使用 threading.Event 来中断函数。
我不知道 zmq 是否适合我的需要,或者套接字类型是否合适。
为了简化,我展示了最简单的版本,这是我尝试过的:
p1.py
import zmq
from threading import Event
port_p2 = "6655"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port_p2)
print("port 6655")
__exit1 = Event()
__exit2 = Event()
def action1():
__exit1.clear()
__exit2.set()
while not __exit1.is_set():
for i in range(1, 20):
print(i)
time.sleep(1)
__exit1.set()
def action2():
__exit2.clear()
__exit1.set()
while not __exit2.is_set():
for i in range(1, 20):
print(i * 100)
time.sleep(1)
__exit2.set()
if __name__ == "__main__":
try:
while True:
try:
string = socket.recv(flags=zmq.NOBLOCK)
# message received, process it
string = str(string, 'utf-8')
if "Action1" in string:
action1()
if "Action2" in string:
action2()
except zmq.Again as e:
# No messages waiting to be processed
pass
time.sleep(0.1)
except(KeyboardInterrupt, SystemExit):
print("exit")
和p2.py
import time
import random
port_p1 = "6655"
context = zmq.Context()
socket_p1 = context.socket(zmq.PAIR)
socket_p1.bind("tcp://*:%s" % port_p1)
print("port 6655")
if __name__ == "__main__":
while True:
i = random.choice(range(1, 10))
print(i)
try:
if random.choice([True, False]):
print("Action 1")
socket_p1.send(b'Action1')
else:
socket_p1.send(b'Action2')
print("Action 2")
except zmq.Again as e:
pass
time.sleep(i)
为了我的目的,我不想/不能使用系统信号
如果有任何意见,我将不胜感激,请毫不犹豫地要求准确,我不得不承认,我在写下这些内容时遇到了困难。
谢谢
Q : …like #1 to interrupt the current function…
鉴于您禁止使用信号,#1 只能被动地发出信号(无论是否通过当前的 ZeroMQ 基础设施)function,不再继续 return 过早成熟的方式(所以 fun()
必须进行适当的修改才能进行主动重新检查,最好以某种相当精细的渐进方式,定期主动检查 #1 是否被动地向 RET
发出信号( "tell" fun()
),无论出于什么原因和方式 #1 有和曾经这样做过)。
另一种机会是使用套接字监视器扩展已经存在的 ZeroMQ 基础设施(Context()
-实例(s))并使 fun()
.connect()
-直接到 socket-monitor 资源以主动了解到达#1 的任何新消息(即自主地,没有#1 的主动性)并决定以过早的方式return,在那些情况下,根据您的可行情况应用程序逻辑。
对于 socket-monitor 案例,API 文档包含实施所需的所有细节,否则将超出 Stack Overflow 的范围 post。
这是我的问题:我有 2 个程序在任意 tcp 端口上通过 zmq 进行通信。 当#1 收到来自#2 的消息时,他必须调用一些函数。 如果#1 在当前函数结束之前收到消息,我希望#1 中断当前函数并调用新函数。 我尝试使用 threading.Event 来中断函数。 我不知道 zmq 是否适合我的需要,或者套接字类型是否合适。
为了简化,我展示了最简单的版本,这是我尝试过的: p1.py
import zmq
from threading import Event
port_p2 = "6655"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port_p2)
print("port 6655")
__exit1 = Event()
__exit2 = Event()
def action1():
__exit1.clear()
__exit2.set()
while not __exit1.is_set():
for i in range(1, 20):
print(i)
time.sleep(1)
__exit1.set()
def action2():
__exit2.clear()
__exit1.set()
while not __exit2.is_set():
for i in range(1, 20):
print(i * 100)
time.sleep(1)
__exit2.set()
if __name__ == "__main__":
try:
while True:
try:
string = socket.recv(flags=zmq.NOBLOCK)
# message received, process it
string = str(string, 'utf-8')
if "Action1" in string:
action1()
if "Action2" in string:
action2()
except zmq.Again as e:
# No messages waiting to be processed
pass
time.sleep(0.1)
except(KeyboardInterrupt, SystemExit):
print("exit")
和p2.py
import time
import random
port_p1 = "6655"
context = zmq.Context()
socket_p1 = context.socket(zmq.PAIR)
socket_p1.bind("tcp://*:%s" % port_p1)
print("port 6655")
if __name__ == "__main__":
while True:
i = random.choice(range(1, 10))
print(i)
try:
if random.choice([True, False]):
print("Action 1")
socket_p1.send(b'Action1')
else:
socket_p1.send(b'Action2')
print("Action 2")
except zmq.Again as e:
pass
time.sleep(i)
为了我的目的,我不想/不能使用系统信号
如果有任何意见,我将不胜感激,请毫不犹豫地要求准确,我不得不承认,我在写下这些内容时遇到了困难。 谢谢
Q : …like #1 to interrupt the current function…
鉴于您禁止使用信号,#1 只能被动地发出信号(无论是否通过当前的 ZeroMQ 基础设施)function,不再继续 return 过早成熟的方式(所以 fun()
必须进行适当的修改才能进行主动重新检查,最好以某种相当精细的渐进方式,定期主动检查 #1 是否被动地向 RET
发出信号( "tell" fun()
),无论出于什么原因和方式 #1 有和曾经这样做过)。
另一种机会是使用套接字监视器扩展已经存在的 ZeroMQ 基础设施(Context()
-实例(s))并使 fun()
.connect()
-直接到 socket-monitor 资源以主动了解到达#1 的任何新消息(即自主地,没有#1 的主动性)并决定以过早的方式return,在那些情况下,根据您的可行情况应用程序逻辑。
对于 socket-monitor 案例,API 文档包含实施所需的所有细节,否则将超出 Stack Overflow 的范围 post。