在不终止进程的情况下中止 Python 进程中的代码执行
Aborting code execution in a Python Process without terminating the process
假设我有一个 (websocket) API、api.py
,因此:
from flask import Flask, request
from flask_socketio import SocketIO, emit
from worker import Worker
app = Flask()
socketio = SocketIO(app)
worker = Worker()
worker.start()
@socketio.on('connect')
def connect():
print("Client", request.sid, "connected")
@socketio.on('get_results')
def get_results(query):
"""
The only endpoing of the API.
"""
print("Client", request.sid, "requested results for query", query)
# Set the worker to work, wait for results to be ready, and
# send the results back to the client.
worker.task_queue.put(query)
results = worker.result_queue.get()
emit("results", results)
@socketio.on('disconnect')
def disconnect():
print("Client", request.sid, "disconnected, perhaps before results where ready")
# What to do here?
socketio.run(app, host='')
a API 将服务于许多客户端,但只有一个 worker 来产生应该服务的结果。 worker.py
:
from multiprocessing import Process, Queue
class Worker(Process):
def __init__(self):
super().__init__()
self.task_queue = Queue()
self.result_queue = Queue()
self.some_stateful_variable = 0
# Do other computationally expensive work
def reset_state(self):
# Computationally inexpensive.
pass
def do_work(self, task):
# Computationally expensive. Takes long time.
# Modifies internal state.
pass
def run(self):
while True:
task = self.task_queue.get()
results = self.do_work(task)
self.result_queue.put(results)
工作人员收到请求,即要完成的任务,并着手产生结果。当结果准备好时,客户端将被送达。
但并不是所有的客户都有耐心。在结果准备好之前,他们可能会离开,即与 API 断开连接。他们不想要它们,因此工人最终从事一项不需要完成的任务。这使得队列中的其他客户端不必要地等待。如何避免这种情况,并让 worker 中止执行 do_work
不需要完成的任务?
在客户端:当用户关闭浏览器选项卡或离开页面时向您的 Flask 服务器发送请求,该请求应包含您要取消的任务的 ID。
在服务器端将任务的取消状态放入数据库或 Flask Server 与您的工作进程之间的任何共享变量
将任务处理分成几个阶段,在每个阶段之前检查数据库中任务的状态,如果状态被取消 - 停止任务处理。
第 1 点的另一个选择是在服务器端在单独的进程中进行一些监视 - 计算来自客户端的状态请求之间的间隔。
我通过启动一个完全独立的进程来处理类似的问题:
sp.call('start python path\worker.py', shell=True)
worker.py 然后会通过 redis 将其 PID 报告回 api.py,然后在 api.py
的任何时候直接终止进程
当然,这对你来说有多可行将取决于有多少数据驻留在 api.py 中并共享给 worker.py - 通过 redis 传递是否可行是由您决定。
额外的好处是您可以将套接字与繁重的计算分离 - 您可以进入准多核(每个 worker.py 的单线程)。如果您愿意,您可以通过将多处理合并到每个 worker.py 中来实现完整的多核。
假设我有一个 (websocket) API、api.py
,因此:
from flask import Flask, request
from flask_socketio import SocketIO, emit
from worker import Worker
app = Flask()
socketio = SocketIO(app)
worker = Worker()
worker.start()
@socketio.on('connect')
def connect():
print("Client", request.sid, "connected")
@socketio.on('get_results')
def get_results(query):
"""
The only endpoing of the API.
"""
print("Client", request.sid, "requested results for query", query)
# Set the worker to work, wait for results to be ready, and
# send the results back to the client.
worker.task_queue.put(query)
results = worker.result_queue.get()
emit("results", results)
@socketio.on('disconnect')
def disconnect():
print("Client", request.sid, "disconnected, perhaps before results where ready")
# What to do here?
socketio.run(app, host='')
a API 将服务于许多客户端,但只有一个 worker 来产生应该服务的结果。 worker.py
:
from multiprocessing import Process, Queue
class Worker(Process):
def __init__(self):
super().__init__()
self.task_queue = Queue()
self.result_queue = Queue()
self.some_stateful_variable = 0
# Do other computationally expensive work
def reset_state(self):
# Computationally inexpensive.
pass
def do_work(self, task):
# Computationally expensive. Takes long time.
# Modifies internal state.
pass
def run(self):
while True:
task = self.task_queue.get()
results = self.do_work(task)
self.result_queue.put(results)
工作人员收到请求,即要完成的任务,并着手产生结果。当结果准备好时,客户端将被送达。
但并不是所有的客户都有耐心。在结果准备好之前,他们可能会离开,即与 API 断开连接。他们不想要它们,因此工人最终从事一项不需要完成的任务。这使得队列中的其他客户端不必要地等待。如何避免这种情况,并让 worker 中止执行 do_work
不需要完成的任务?
在客户端:当用户关闭浏览器选项卡或离开页面时向您的 Flask 服务器发送请求,该请求应包含您要取消的任务的 ID。
在服务器端将任务的取消状态放入数据库或 Flask Server 与您的工作进程之间的任何共享变量
将任务处理分成几个阶段,在每个阶段之前检查数据库中任务的状态,如果状态被取消 - 停止任务处理。
第 1 点的另一个选择是在服务器端在单独的进程中进行一些监视 - 计算来自客户端的状态请求之间的间隔。
我通过启动一个完全独立的进程来处理类似的问题:
sp.call('start python path\worker.py', shell=True)
worker.py 然后会通过 redis 将其 PID 报告回 api.py,然后在 api.py
的任何时候直接终止进程当然,这对你来说有多可行将取决于有多少数据驻留在 api.py 中并共享给 worker.py - 通过 redis 传递是否可行是由您决定。
额外的好处是您可以将套接字与繁重的计算分离 - 您可以进入准多核(每个 worker.py 的单线程)。如果您愿意,您可以通过将多处理合并到每个 worker.py 中来实现完整的多核。