具有延迟的 Klein 应用程序
Klein app with deferred
我正在探索 Klein 和 Deferred。在下面的示例中,我尝试使用子进程增加一个数字,并通过 Future return 它。我可以收到 Future 的回电。
问题是延迟对象从不调用 cb() 函数,并且对端点的请求从不 returns。请帮我找出问题所在。
以下是我的server.py代码
from klein import Klein
from twisted.internet.defer import inlineCallbacks, returnValue
import Process4
if __name__ == '__main__':
app = Klein()
@app.route('/visit')
@inlineCallbacks
def get_num_visit(request):
try:
resp = yield Process4.get_visitor_num()
req.setResponseCode(200)
returnValue('Visited = {}'.format(resp))
except Exception as e:
req.setResponseCode(500)
returnValue('error {}'.format(e))
print('starting server')
app.run('0.0.0.0', 5005)
以下是Process4.py代码
from multiprocessing import Process
from concurrent.futures import Future
from time import sleep
from twisted.internet.defer import Deferred
def foo(x):
result = x+1
sleep(3)
return result
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.f = Future()
self.visit = 0
def run(self):
r = foo(self.visit)
self.f.set_result(result=r)
def cb(result):
print('visitor number {}'.format(result))
return result
def eb(err):
print('error occurred {}'.format(err))
return err
def future_to_deferred(future):
d = Deferred()
def callback(f):
e = f.exception()
if e:
d.errback(e)
else:
d.callback(f.result())
future.add_done_callback(callback)
return d
def get_visitor_num():
p1 = MyProcess(target=foo, args=None)
d = future_to_deferred(p1.f)
p1.start()
d.addCallback(cb)
d.addErrback(eb)
sleep(1)
return d
编辑 1
在启动进程p1之前添加回调解决了调用cb()函数的问题。但是向端点发出的 http 请求仍然没有 return.
原来在运行()[=28=中设置未来的结果self.f.set_result(result=r) ]方法触发子进程中的callback()方法,其中没有线程在等待返回结果!
因此,为了在 MainProcess 中触发 callback() 函数,我必须使用 MainProcess 中的工作线程使用多进程队列从子进程中获取结果,然后设置未来的结果。
@notorious.no 谢谢回复。我注意到的一件事是 reactor.callFromThread 在我修改后的代码中确实将工作线程的结果切换到 MainThread 但是 d.callback(f.result()) 工作得很好但是 returns 结果来自工作线程。
以下是修改后的工作代码
server.py
from klein import Klein
from twisted.internet.defer import inlineCallbacks, returnValue
import Process4
if __name__ == '__main__':
app = Klein()
visit_count = 0
@app.route('/visit')
@inlineCallbacks
def get_num_visit(req):
global visit_count
try:
resp = yield Process4.get_visitor_num(visit_count)
req.setResponseCode(200)
visit_count = resp
returnValue('Visited = {}'.format(resp))
except Exception as e:
req.setResponseCode(500)
returnValue('error {}'.format(e))
print('starting server')
app.run('0.0.0.0', 5005)
Process4.py
from multiprocessing import Process, Queue
from concurrent.futures import Future
from time import sleep
from twisted.internet.defer import Deferred
import threading
from twisted.internet import reactor
def foo(x, q):
result = x+1
sleep(3)
print('setting result, {}'.format(result))
q.put(result)
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.visit = 0
def run(self):
self.target(*self.args)
def future_to_deferred(future):
d = Deferred()
def callback(f):
e = f.exception()
print('inside callback {}'.format(threading.current_thread().name))
if e:
print('calling errback')
d.errback(e)
# reactor.callFromThread(d.errback, e)
else:
print('calling callback with result {}'.format(f.result()))
# d.callback(f.result())
reactor.callFromThread(d.callback, f.result())
future.add_done_callback(callback)
return d
def wait(q,f):
r = q.get(block=True)
f.set_result(r)
def get_visitor_num(x):
def cb(result):
print('inside cb visitor number {} {}'.format(result, threading.current_thread().name))
return result
def eb(err):
print('inside eb error occurred {}'.format(err))
return err
f = Future()
q = Queue()
p1 = MyProcess(target=foo, args=(x,q,))
wait_thread = threading.Thread(target=wait, args=(q,f,))
wait_thread.start()
defr = future_to_deferred(f)
defr.addCallback(cb)
defr.addErrback(eb)
p1.start()
print('returning deferred')
return defr
我正在探索 Klein 和 Deferred。在下面的示例中,我尝试使用子进程增加一个数字,并通过 Future return 它。我可以收到 Future 的回电。
问题是延迟对象从不调用 cb() 函数,并且对端点的请求从不 returns。请帮我找出问题所在。
以下是我的server.py代码
from klein import Klein
from twisted.internet.defer import inlineCallbacks, returnValue
import Process4
if __name__ == '__main__':
app = Klein()
@app.route('/visit')
@inlineCallbacks
def get_num_visit(request):
try:
resp = yield Process4.get_visitor_num()
req.setResponseCode(200)
returnValue('Visited = {}'.format(resp))
except Exception as e:
req.setResponseCode(500)
returnValue('error {}'.format(e))
print('starting server')
app.run('0.0.0.0', 5005)
以下是Process4.py代码
from multiprocessing import Process
from concurrent.futures import Future
from time import sleep
from twisted.internet.defer import Deferred
def foo(x):
result = x+1
sleep(3)
return result
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.f = Future()
self.visit = 0
def run(self):
r = foo(self.visit)
self.f.set_result(result=r)
def cb(result):
print('visitor number {}'.format(result))
return result
def eb(err):
print('error occurred {}'.format(err))
return err
def future_to_deferred(future):
d = Deferred()
def callback(f):
e = f.exception()
if e:
d.errback(e)
else:
d.callback(f.result())
future.add_done_callback(callback)
return d
def get_visitor_num():
p1 = MyProcess(target=foo, args=None)
d = future_to_deferred(p1.f)
p1.start()
d.addCallback(cb)
d.addErrback(eb)
sleep(1)
return d
编辑 1
在启动进程p1之前添加回调解决了调用cb()函数的问题。但是向端点发出的 http 请求仍然没有 return.
原来在运行()[=28=中设置未来的结果self.f.set_result(result=r) ]方法触发子进程中的callback()方法,其中没有线程在等待返回结果!
因此,为了在 MainProcess 中触发 callback() 函数,我必须使用 MainProcess 中的工作线程使用多进程队列从子进程中获取结果,然后设置未来的结果。
@notorious.no 谢谢回复。我注意到的一件事是 reactor.callFromThread 在我修改后的代码中确实将工作线程的结果切换到 MainThread 但是 d.callback(f.result()) 工作得很好但是 returns 结果来自工作线程。
以下是修改后的工作代码
server.py
from klein import Klein
from twisted.internet.defer import inlineCallbacks, returnValue
import Process4
if __name__ == '__main__':
app = Klein()
visit_count = 0
@app.route('/visit')
@inlineCallbacks
def get_num_visit(req):
global visit_count
try:
resp = yield Process4.get_visitor_num(visit_count)
req.setResponseCode(200)
visit_count = resp
returnValue('Visited = {}'.format(resp))
except Exception as e:
req.setResponseCode(500)
returnValue('error {}'.format(e))
print('starting server')
app.run('0.0.0.0', 5005)
Process4.py
from multiprocessing import Process, Queue
from concurrent.futures import Future
from time import sleep
from twisted.internet.defer import Deferred
import threading
from twisted.internet import reactor
def foo(x, q):
result = x+1
sleep(3)
print('setting result, {}'.format(result))
q.put(result)
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.visit = 0
def run(self):
self.target(*self.args)
def future_to_deferred(future):
d = Deferred()
def callback(f):
e = f.exception()
print('inside callback {}'.format(threading.current_thread().name))
if e:
print('calling errback')
d.errback(e)
# reactor.callFromThread(d.errback, e)
else:
print('calling callback with result {}'.format(f.result()))
# d.callback(f.result())
reactor.callFromThread(d.callback, f.result())
future.add_done_callback(callback)
return d
def wait(q,f):
r = q.get(block=True)
f.set_result(r)
def get_visitor_num(x):
def cb(result):
print('inside cb visitor number {} {}'.format(result, threading.current_thread().name))
return result
def eb(err):
print('inside eb error occurred {}'.format(err))
return err
f = Future()
q = Queue()
p1 = MyProcess(target=foo, args=(x,q,))
wait_thread = threading.Thread(target=wait, args=(q,f,))
wait_thread.start()
defr = future_to_deferred(f)
defr.addCallback(cb)
defr.addErrback(eb)
p1.start()
print('returning deferred')
return defr