可取消后端调用的架构推荐 [zerorpc if possible?]
Architecture recommendation for cancellable backend calls [zerorpc if possible?]
我有一个电子应用程序前端 (zerorpc-node) 与 python 后端 (zerorpc-python) 通信,需要:
a) 能够向后端发送请求[标准 zerorpc 调用]
b) 能够同时 运行 多个后端进程 [遵循 https://github.com/0rpc/zerorpc-node/issues/96 中的架构)
c) 能够随意取消后端进程[不确定如何使用当前架构执行此操作]
任何有关如何为 (c) 构建解决方案的指导都将非常有用。如果有必要,我愿意放弃 zerorpc,如果它是有限的,但如果解决方案涉及使用 zerorpc,那就太棒了。
我最终使用 gipc
来启动进程。取消机制依赖于这样一个事实,即当 gipc
进程终止时,管道将关闭。整个API很复杂,这就是我最后得到的:
class ZerorpcService():
def __init__(self):
self.participant_id = None
self.extraction_methods = []
# maps pid to (process, pipe writer)
self.processes = {}
self. = lock.Semaphore()
def _launch_process(self, function, kwargs):
"""
Launches a new process
"""
try:
# add required arguments
pid = kwargs["pid"]
# start independent gipc process, communicated via pipe
started = False
with gipc.pipe() as (r, w):
with self.mutex:
if pid in self.processes:
return_value = {'status': 1, 'error': 'pid already exists', "report": True}
return
proc = gipc.start_process(self._process_wrapper, args=(function, kwargs, w))
self.processes[pid] = proc
started = True
# wait for process to send something over pipe
return_value = r.get()
except EOFError as eof:
# happens when we terminate a process because the pipe closes
return_value = {'status': 1, 'error': "pid {} terminated".format(pid), "report": False}
except Exception as error:
logging.exception(error)
return_value = {'status': 1, 'error': str(error), 'traceback': traceback.format_exc(), "report": True}
finally:
# deletes the pid from the map
with self.mutex:
if started:
del self.processes[pid]
return return_value
@staticmethod
def _process_wrapper(function, kwargs, pipe):
"""
Executes f with kwargs and formats the result into a dict.
Wraps it in error handling.
Routes the return value through the pipe provided.
"""
return_val = {'status': 0}
try:
raw_val = function(**kwargs)
if raw_val is not None:
return_val = raw_val
except Exception as error:
logging.exception(error)
return_val = {'status': 1, 'error': str(error), 'traceback': traceback.format_exc(), "report": True}
finally:
pipe.put(return_val)
def cancel_process(self, pid):
if pid in self.processes:
with self.mutex:
process = self.processes[pid]
if process.is_alive():
process.terminate()
return {'status': 0}
else:
return {'status': 1, 'error': 'pid {} not found'.format(pid), "traceback": traceback.format_exc(),
"report": True}
我有一个电子应用程序前端 (zerorpc-node) 与 python 后端 (zerorpc-python) 通信,需要:
a) 能够向后端发送请求[标准 zerorpc 调用]
b) 能够同时 运行 多个后端进程 [遵循 https://github.com/0rpc/zerorpc-node/issues/96 中的架构)
c) 能够随意取消后端进程[不确定如何使用当前架构执行此操作]
任何有关如何为 (c) 构建解决方案的指导都将非常有用。如果有必要,我愿意放弃 zerorpc,如果它是有限的,但如果解决方案涉及使用 zerorpc,那就太棒了。
我最终使用 gipc
来启动进程。取消机制依赖于这样一个事实,即当 gipc
进程终止时,管道将关闭。整个API很复杂,这就是我最后得到的:
class ZerorpcService():
def __init__(self):
self.participant_id = None
self.extraction_methods = []
# maps pid to (process, pipe writer)
self.processes = {}
self. = lock.Semaphore()
def _launch_process(self, function, kwargs):
"""
Launches a new process
"""
try:
# add required arguments
pid = kwargs["pid"]
# start independent gipc process, communicated via pipe
started = False
with gipc.pipe() as (r, w):
with self.mutex:
if pid in self.processes:
return_value = {'status': 1, 'error': 'pid already exists', "report": True}
return
proc = gipc.start_process(self._process_wrapper, args=(function, kwargs, w))
self.processes[pid] = proc
started = True
# wait for process to send something over pipe
return_value = r.get()
except EOFError as eof:
# happens when we terminate a process because the pipe closes
return_value = {'status': 1, 'error': "pid {} terminated".format(pid), "report": False}
except Exception as error:
logging.exception(error)
return_value = {'status': 1, 'error': str(error), 'traceback': traceback.format_exc(), "report": True}
finally:
# deletes the pid from the map
with self.mutex:
if started:
del self.processes[pid]
return return_value
@staticmethod
def _process_wrapper(function, kwargs, pipe):
"""
Executes f with kwargs and formats the result into a dict.
Wraps it in error handling.
Routes the return value through the pipe provided.
"""
return_val = {'status': 0}
try:
raw_val = function(**kwargs)
if raw_val is not None:
return_val = raw_val
except Exception as error:
logging.exception(error)
return_val = {'status': 1, 'error': str(error), 'traceback': traceback.format_exc(), "report": True}
finally:
pipe.put(return_val)
def cancel_process(self, pid):
if pid in self.processes:
with self.mutex:
process = self.processes[pid]
if process.is_alive():
process.terminate()
return {'status': 0}
else:
return {'status': 1, 'error': 'pid {} not found'.format(pid), "traceback": traceback.format_exc(),
"report": True}