Python3 ZMQ,中断函数并在收到每条新消息时调用另一个函数

Python3 ZMQ, Interrupt function and calling another on each new message received

这是我的问题:我有 2 个程序在任意 tcp 端口上通过 zmq 进行通信。 当#1 收到来自#2 的消息时,他必须调用一些函数。 如果#1 在当前函数结束之前收到消息,我希望#1 中断当前函数并调用新函数。 我尝试使用 threading.Event 来中断函数。 我不知道 zmq 是否适合我的需要,或者套接字类型是否合适。

为了简化,我展示了最简单的版本,这是我尝试过的: p1.py

import zmq
from threading import Event

port_p2 = "6655"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port_p2)
print("port 6655")

__exit1 = Event()
__exit2 = Event()


def action1():
    __exit1.clear()
    __exit2.set()
    while not __exit1.is_set():
        for i in range(1, 20):
            print(i)
            time.sleep(1)
        __exit1.set()


def action2():
    __exit2.clear()
    __exit1.set()
    while not __exit2.is_set():
        for i in range(1, 20):
            print(i * 100)
            time.sleep(1)
        __exit2.set()


if __name__ == "__main__":
    try:
        while True:
            try:
                string = socket.recv(flags=zmq.NOBLOCK)
                # message received, process it
                string = str(string, 'utf-8')
                if "Action1" in string:
                    action1()
                if "Action2" in string:
                    action2()
            except zmq.Again as e:
                # No messages waiting to be processed
                pass
            time.sleep(0.1)
    except(KeyboardInterrupt, SystemExit):
        print("exit")

和p2.py

import time
import random

port_p1 = "6655"
context = zmq.Context()
socket_p1 = context.socket(zmq.PAIR)
socket_p1.bind("tcp://*:%s" % port_p1)
print("port 6655")


if __name__ == "__main__":

    while True:
        i = random.choice(range(1, 10))
        print(i)
        try:
            if random.choice([True, False]):
                print("Action 1")
                socket_p1.send(b'Action1')
            else:
                socket_p1.send(b'Action2')
                print("Action 2")
        except zmq.Again as e:
            pass
        time.sleep(i)

为了我的目的,我不想/不能使用系统信号

如果有任何意见,我将不胜感激,请毫不犹豫地要求准确,我不得不承认,我在写下这些内容时遇到了困难。 谢谢

Q : like #1 to interrupt the current function…

鉴于您禁止使用信号,#1 只能被动地发出信号(无论是否通过当前的 ZeroMQ 基础设施)function,不再继续 return 过早成熟的方式(所以 fun() 必须进行适当的修改才能进行主动重新检查,最好以某种相当精细的渐进方式,定期主动检查 #1 是否被动地向 RET 发出信号( "tell" fun() ),无论出于什么原因和方式 #1 有和曾经这样做过)。

另一种机会是使用套接字监视器扩展已经存在的 ZeroMQ 基础设施(Context()-实例(s))并使 fun() .connect()-直接到 socket-monitor 资源以主动了解到达#1 的任何新消息(即自主地,没有#1 的主动性)并决定以过早的方式return,在那些情况下,根据您的可行情况应用程序逻辑。

对于 socket-monitor 案例,API 文档包含实施所需的所有细节,否则将超出 Stack Overflow 的范围 post。