在不终止进程的情况下中止 Python 进程中的代码执行

Aborting code execution in a Python Process without terminating the process

假设我有一个 (websocket) API、api.py,因此:

from flask import Flask, request
from flask_socketio import SocketIO, emit
from worker import Worker

app = Flask()
socketio = SocketIO(app)
worker = Worker()
worker.start()

@socketio.on('connect')
def connect():
    print("Client", request.sid, "connected")

@socketio.on('get_results')
def get_results(query):
    """
    The only endpoing of the API.
    """
    print("Client", request.sid, "requested results for query", query)
    # Set the worker to work, wait for results to be ready, and
    # send the results back to the client.
    worker.task_queue.put(query)
    results = worker.result_queue.get()
    emit("results", results)

@socketio.on('disconnect')
def disconnect():
    print("Client", request.sid, "disconnected, perhaps before results where ready")
    # What to do here?

socketio.run(app, host='')

a API 将服务于许多客户端,但只有一个 worker 来产生应该服务的结果。 worker.py:

from multiprocessing import Process, Queue

class Worker(Process):
    def __init__(self):
        super().__init__()
        self.task_queue = Queue()
        self.result_queue = Queue()
        self.some_stateful_variable = 0
        # Do other computationally expensive work
        
    def reset_state(self):
        # Computationally inexpensive.
        pass

    def do_work(self, task):
        # Computationally expensive. Takes long time.
        # Modifies internal state.
        pass

    def run(self):
        while True:
            task = self.task_queue.get()
            results = self.do_work(task)
            self.result_queue.put(results)

工作人员收到请求,即要完成的任务,并着手产生结果。当结果准备好时,客户端将被送达。

但并不是所有的客户都有耐心。在结果准备好之前,他们可能会离开,即与 API 断开连接。他们不想要它们,因此工人最终从事一项不需要完成的任务。这使得队列中的其他客户端不必要地等待。如何避免这种情况,并让 worker 中止执行 do_work 不需要完成的任务?

  1. 在客户端:当用户关闭浏览器选项卡或离开页面时向您的 Flask 服务器发送请求,该请求应包含您要取消的任务的 ID。

  2. 在服务器端将任务的取消状态放入数据库或 Flask Server 与您的工作进程之间的任何共享变量

  3. 将任务处理分成几个阶段,在每个阶段之前检查数据库中任务的状态,如果状态被取消 - 停止任务处理。

第 1 点的另一个选择是在服务器端在单独的进程中进行一些监视 - 计算来自客户端的状态请求之间的间隔。

我通过启动一个完全独立的进程来处理类似的问题:

sp.call('start python path\worker.py', shell=True)

worker.py 然后会通过 redis 将其 PID 报告回 api.py,然后在 api.py

的任何时候直接终止进程

当然,这对你来说有多可行将取决于有多少数据驻留在 api.py 中并共享给 worker.py - 通过 redis 传递是否可行是由您决定。

额外的好处是您可以将套接字与繁重的计算分离 - 您可以进入准多核(每个 worker.py 的单线程)。如果您愿意,您可以通过将多处理合并到每个 worker.py 中来实现完整的多核。