http客户端关闭连接时如何存储响应?
How to store response when http client closed a connection?
我在 python 中使用 falcon 框架来形成 json 网络响应 api。
例如,我有一个名为 logic()
的函数,可以运行 30-90 分钟。我想要这样的东西:
- 当 http-client 请求 /api/somepath.json 我们调用
somepath_handle()
somepath_handle()
在另一个 thread/process 中运行 logic()
- 当
logic()
完成时,线程关闭
somepath_handle()
从 return 读取 logic()
的响应
- 如果
somepath_handle()
在 logic()
完成之前被杀死,那么 thread/etc 和 logic()
在完成之前不会停止
代码:
def somepath_handle():
run_async_logic()
response=wait_for_async_logic_response() # read response of logic()
return_response(response)
如果您的过程需要这么长时间,我建议您使用电子邮件或实时通知系统将结果发送给用户?
我正在使用一个简单的工作程序来创建我正在处理一些命令的队列。如果添加简单的响应存储,那么将有可能处理任何请求并且在连接丢失时不会丢失它们。
示例:
它是使用 falconframework.org 响应请求的主要功能。
main.py:
from flow import Flow
import falcon
import threading
import storage
__version__ = 0.1
__author__ = 'weldpua2008@gmail.com'
app = falcon.API(
media_type='application/json')
app.add_route('/flow', Flow())
THREADS_COUNT = 1
# adding the workers to process queue of command
worker = storage.worker
for _ in xrange(THREADS_COUNT):
thread = threading.Thread(target=worker)
thread.daemon = True
thread.start()
这是简单的存储工作代码
storage.py:
from Queue import Queue
import subprocess
import logging
main_queque = Queue()
def worker():
global main_roles_queque
while True:
try:
cmd = main_queque.get()
#do_work(item)
#time.sleep(5)
handler = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = handler.communicate()
logging.critical("[queue_worker]: stdout:%s, stderr:%s, cmd:%s" %(stdout, stderr, cmd))
main_queque.task_done()
except Exception as error:
logging.critical("[queue_worker:error] %s" %(error))
class 将处理任何请求 [POST,GET]
flow.py:
import storage
import json
import falcon
import random
class Flow(object):
def on_get(self, req, resp):
storage_value = storage.main_queque.qsize()
msg = {"qsize": storage_value}
resp.body = json.dumps(msg, sort_keys=True, indent=4)
resp.status = falcon.HTTP_200
#curl -H "Content-Type: application/json" -d '{}' http://10.206.102.81:8888/flow
def on_post(self, req, resp):
r = random.randint(1, 10000000000000)
cmd = 'sleep 1;echo "ss %s"' % str(r)
storage.main_queque.put(cmd)
storage_value = cmd
msg = {"value": storage_value}
resp.body = json.dumps(msg, sort_keys=True, indent=4)
resp.status = falcon.HTTP_200
我在 python 中使用 falcon 框架来形成 json 网络响应 api。
例如,我有一个名为 logic()
的函数,可以运行 30-90 分钟。我想要这样的东西:
- 当 http-client 请求 /api/somepath.json 我们调用
somepath_handle()
somepath_handle()
在另一个 thread/process 中运行 - 当
logic()
完成时,线程关闭 somepath_handle()
从 return 读取 - 如果
somepath_handle()
在logic()
完成之前被杀死,那么 thread/etc 和logic()
在完成之前不会停止
logic()
logic()
的响应
代码:
def somepath_handle():
run_async_logic()
response=wait_for_async_logic_response() # read response of logic()
return_response(response)
如果您的过程需要这么长时间,我建议您使用电子邮件或实时通知系统将结果发送给用户?
我正在使用一个简单的工作程序来创建我正在处理一些命令的队列。如果添加简单的响应存储,那么将有可能处理任何请求并且在连接丢失时不会丢失它们。
示例: 它是使用 falconframework.org 响应请求的主要功能。
main.py:
from flow import Flow
import falcon
import threading
import storage
__version__ = 0.1
__author__ = 'weldpua2008@gmail.com'
app = falcon.API(
media_type='application/json')
app.add_route('/flow', Flow())
THREADS_COUNT = 1
# adding the workers to process queue of command
worker = storage.worker
for _ in xrange(THREADS_COUNT):
thread = threading.Thread(target=worker)
thread.daemon = True
thread.start()
这是简单的存储工作代码 storage.py:
from Queue import Queue
import subprocess
import logging
main_queque = Queue()
def worker():
global main_roles_queque
while True:
try:
cmd = main_queque.get()
#do_work(item)
#time.sleep(5)
handler = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = handler.communicate()
logging.critical("[queue_worker]: stdout:%s, stderr:%s, cmd:%s" %(stdout, stderr, cmd))
main_queque.task_done()
except Exception as error:
logging.critical("[queue_worker:error] %s" %(error))
class 将处理任何请求 [POST,GET] flow.py:
import storage
import json
import falcon
import random
class Flow(object):
def on_get(self, req, resp):
storage_value = storage.main_queque.qsize()
msg = {"qsize": storage_value}
resp.body = json.dumps(msg, sort_keys=True, indent=4)
resp.status = falcon.HTTP_200
#curl -H "Content-Type: application/json" -d '{}' http://10.206.102.81:8888/flow
def on_post(self, req, resp):
r = random.randint(1, 10000000000000)
cmd = 'sleep 1;echo "ss %s"' % str(r)
storage.main_queque.put(cmd)
storage_value = cmd
msg = {"value": storage_value}
resp.body = json.dumps(msg, sort_keys=True, indent=4)
resp.status = falcon.HTTP_200