实现每个函数对 Python XML-RPC 服务器的最大并发调用数
Implement Per-Function Maximum Number of Concurrent Calls to Python XML-RPC Server
我有一个基于 Python 的 SimpleXMLRPCServer
类似于:
from multiprocessing import Process
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import SocketServer
class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
pass
# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)
def main():
server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
server.register_function(tester1)
server.register_function(tester2)
print("Server running...")
server.serve_forever()
def tester1(id):
p = Process(target=my_func1, args=(id,))
p.start()
return True
def tester2(id):
p = Process(target=my_func2, args=(id,))
p.start()
return True
我想实现一种方法来跟踪当前为 tester1
和 tester2
执行的并发进程数,如果超过最大(用户定义的)数量仍在执行,然后将每个新请求排队并在数量低于阈值时执行。
也许每个函数共享 Pool
?
以下内容似乎符合我的要求:
from multiprocessing import Process, JoinableQueue
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import SocketServer
import threading
tester1_queue = JoinableQueue()
tester2_queue = JoinableQueue()
tester1_max_concurrent = 10
tester2_max_concurrent = 10
class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
pass
# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)
def main():
# spin up tester1 queue watcher threads
for i in range(tester1_max_concurrent):
worker = threading.Thread(target=tester1_queue_watcher, args=(tester1_queue,))
worker.daemon = True
worker.start()
# spin up tester2 queue watcher threads
for i in range(tester2_max_concurrent):
worker = threading.Thread(target=tester2_queue_watcher, args=(tester2_queue,))
worker.daemon = True
worker.start()
server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
server.register_function(tester1_handler, 'tester1')
server.register_function(tester2_handler, 'tester2' )
print("Server running...")
server.serve_forever()
def tester1_handler(id):
tester1_queue.put((id,))
return True
def tester1_queue_watcher(q):
while True:
id = q.get()
p = Process(target=tester1, args=(id,))
p.start()
p.join()
q.task_done()
def tester1(id):
# do stuff
def tester2_handler(id):
tester2_queue.put((id,))
return True
def tester2_queue_watcher(q):
while True:
id = q.get()
p = Process(target=tester2, args=(id,))
p.start()
p.join()
q.task_done()
def tester2(id):
# do stuff
我有一个基于 Python 的 SimpleXMLRPCServer
类似于:
from multiprocessing import Process
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import SocketServer
class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
pass
# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)
def main():
server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
server.register_function(tester1)
server.register_function(tester2)
print("Server running...")
server.serve_forever()
def tester1(id):
p = Process(target=my_func1, args=(id,))
p.start()
return True
def tester2(id):
p = Process(target=my_func2, args=(id,))
p.start()
return True
我想实现一种方法来跟踪当前为 tester1
和 tester2
执行的并发进程数,如果超过最大(用户定义的)数量仍在执行,然后将每个新请求排队并在数量低于阈值时执行。
也许每个函数共享 Pool
?
以下内容似乎符合我的要求:
from multiprocessing import Process, JoinableQueue
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import SocketServer
import threading
tester1_queue = JoinableQueue()
tester2_queue = JoinableQueue()
tester1_max_concurrent = 10
tester2_max_concurrent = 10
class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
pass
# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)
def main():
# spin up tester1 queue watcher threads
for i in range(tester1_max_concurrent):
worker = threading.Thread(target=tester1_queue_watcher, args=(tester1_queue,))
worker.daemon = True
worker.start()
# spin up tester2 queue watcher threads
for i in range(tester2_max_concurrent):
worker = threading.Thread(target=tester2_queue_watcher, args=(tester2_queue,))
worker.daemon = True
worker.start()
server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
server.register_function(tester1_handler, 'tester1')
server.register_function(tester2_handler, 'tester2' )
print("Server running...")
server.serve_forever()
def tester1_handler(id):
tester1_queue.put((id,))
return True
def tester1_queue_watcher(q):
while True:
id = q.get()
p = Process(target=tester1, args=(id,))
p.start()
p.join()
q.task_done()
def tester1(id):
# do stuff
def tester2_handler(id):
tester2_queue.put((id,))
return True
def tester2_queue_watcher(q):
while True:
id = q.get()
p = Process(target=tester2, args=(id,))
p.start()
p.join()
q.task_done()
def tester2(id):
# do stuff