CherryPy:如何在更新数据时停止和缓冲传入请求
CherryPy: how to stop and buffer incoming request while data is updated
我正在服务器中使用 cherrypy,该服务器实现了 RESTful,例如 API。
这些响应意味着一些繁重的计算,大约需要 2 秒
要求。为了进行此计算,使用了一些更新了三个的数据
一天几次。
数据在后台更新(大约需要半小时),
一旦更新,新数据的引用就会传递给
响应请求的函数。这只需一毫秒。
我需要的是确保每个请求都得到响应
旧数据或新数据,但 none 可以在更改数据引用时进行请求处理。理想情况下,我想找到一种在数据引用更改时缓冲传入请求的方法,并确保在所有进程中请求完成后更改引用。
我当前(未)工作的最小示例如下:
import time
import cherrypy
from cherrypy.process import plugins
theData = 0
def processData():
"""Backround task works for half hour three times a day,
and when finishes it publish it in the engine buffer."""
global theData # using global variables to simplify the example
theData += 1
cherrypy.engine.publish("doChangeData", theData)
class DataPublisher(object):
def __init__(self):
self.data = 'initData'
cherrypy.engine.subscribe('doChangeData', self.changeData)
def changeData(self, newData):
cherrypy.engine.log("Changing data, buffering should start!")
self.data = newData
time.sleep(1) #exageration of the 1 milisec of the references update to visualize the problem
cherrypy.engine.log("Continue serving buffered and new requests.")
@cherrypy.expose
def index(self):
result = "I get "+str(self.data)
cherrypy.engine.log(result)
time.sleep(3)
return result
if __name__ == '__main__':
conf = {
'/': { 'server.socket_host': '127.0.0.1',
'server.socket_port': 8080}
}
cherrypy.config.update(conf)
btask = plugins.BackgroundTask(5, processData) #5 secs for the example
btask.start()
cherrypy.quickstart(DataPublisher())
如果我运行这个脚本,也打开浏览器,输入localhost:8080并刷新
页面很多,我得到:
...
[17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start!
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "...
[17/Sep/2015:21:32:42] ENGINE I get 3
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...
这意味着一些请求处理在之前开始并在之后结束
数据引用开始或结束被更改。我想避免这两种情况。
类似于:
...
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "...
[17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start!
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
[17/Sep/2015:21:32:42] ENGINE I get 3
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...
我搜索了文档和网络,发现这些参考资料并未完全涵盖这种情况:
http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html
How to execute asynchronous post-processing in CherryPy?
http://tools.cherrypy.org/wiki/BackgroundTaskQueue
Cherrypy : which solutions for pages with large processing time
How to stop request processing in Cherrypy?
更新(有一个简单的解决方案):
经过深思熟虑,我认为这个问题具有误导性,因为它在问题本身中包含了一些实现要求,即:停止处理并开始缓冲。而对于这个问题,要求可以简化为:确保每个请求都是用旧数据或新数据处理的。
对于后者,存储所用数据的临时本地引用就足够了。这个引用在所有的请求处理中都可以使用,换个线程改也没问题self.data
。对于 python 个对象,垃圾收集器将处理旧数据。
具体来说,将索引函数改成:
@cherrypy.expose
def index(self):
tempData = self.data
result = "I started with %s"%str(tempData)
time.sleep(3) # Heavy use of tempData
result += " that changed to %s"%str(self.data)
result += " but I am still using %s"%str(tempData)
cherrypy.engine.log(result)
return result
结果我们将看到:
[21/Sep/2015:10:06:00] ENGINE I started with 1 that changed to 2 but I am still using 1
我仍然想保留原始的(限制性更强的)问题和 cyraxjoe 答案,因为我发现这些解决方案非常有用。
我将解释 两种 一种解决问题的方法es。
第一个是基于插件的。
Plugin based 还是需要一种同步。它之所以有效,是因为只有一个 BackgroundTask
进行修改(也只是一个原子操作)。
import time
import threading
import cherrypy
from cherrypy.process import plugins
UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20
next_data = 1
class DataGateway(plugins.SimplePlugin):
def __init__(self, bus):
super(DataGateway, self).__init__(bus)
self.data = next_data
def start(self):
self.bus.log("Starting DataGateway")
self.bus.subscribe('dg:get', self._get_data)
self.bus.subscribe('dg:update', self._update_data)
self.bus.log("DataGateway has been started")
def stop(self):
self.bus.log("Stopping DataGateway")
self.bus.unsubscribe('dg:get', self._get_data)
self.bus.unsubscribe('dg:update', self._update_data)
self.bus.log("DataGateway has been stopped")
def _update_data(self, new_val):
self.bus.log("Changing data, buffering should start!")
self.data = new_val
time.sleep(UPDATE_DELAY)
self.bus.log("Continue serving buffered and new requests.")
def _get_data(self):
return self.data
def processData():
"""Backround task works for half hour three times a day,
and when finishes it publish it in the engine buffer."""
global next_data
cherrypy.engine.publish("dg:update", next_data)
next_data += 1
class DataPublisher(object):
@property
def data(self):
return cherrypy.engine.publish('dg:get').pop()
@cherrypy.expose
def index(self):
result = "I get " + str(self.data)
cherrypy.engine.log(result)
time.sleep(REQUEST_DELAY)
return result
if __name__ == '__main__':
conf = {
'global': {
'server.thread_pool': THREAD_POOL_SIZE,
'server.socket_host': '127.0.0.1',
'server.socket_port': 8080,
}
}
cherrypy.config.update(conf)
DataGateway(cherrypy.engine).subscribe()
plugins.BackgroundTask(UPDATE_DELAY, processData).start()
cherrypy.quickstart(DataPublisher())
在此版本中,同步来自于 读取和写入 操作都在 cherrypy.engine
线程上执行的事实。 一切都抽象在插件上DataGateway
你刚刚操作发布到引擎中
第二种方法是使用 Event
一个 threading.Event
对象。这是一种更手动的方法,它的额外好处是它可能会更快,因为读取速度更快,因为它不在 cherrypy.engine
线程上执行。
threading.Event 基于(a.k.a。手动)
import time
import cherrypy
import threading
from cherrypy.process import plugins
UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20
next_data = 1
def processData():
"""Backround task works for half hour three times a day,
and when finishes it publish it in the engine buffer."""
global next_data
cherrypy.engine.publish("doChangeData", next_data)
next_data += 1
class DataPublisher(object):
def __init__(self):
self._data = next_data
self._data_readable = threading.Event()
cherrypy.engine.subscribe('doChangeData', self.changeData)
@property
def data(self):
if self._data_readable.is_set():
return self._data
else:
self._data_readable.wait()
return self.data
@data.setter
def data(self, value):
self._data_readable.clear()
time.sleep(UPDATE_DELAY)
self._data = value
self._data_readable.set()
def changeData(self, newData):
cherrypy.engine.log("Changing data, buffering should start!")
self.data = newData
cherrypy.engine.log("Continue serving buffered and new requests.")
@cherrypy.expose
def index(self):
result = "I get " + str(self.data)
cherrypy.engine.log(result)
time.sleep(REQUEST_DELAY)
return result
if __name__ == '__main__':
conf = {
'global': {
'server.thread_pool': THREAD_POOL_SIZE,
'server.socket_host': '127.0.0.1',
'server.socket_port': 8080,
}
}
cherrypy.config.update(conf)
plugins.BackgroundTask(UPDATE_INTERVAL, processData).start()
cherrypy.quickstart(DataPublisher())
我用 @property
装饰器添加了一些细节,但真正的要点在于 threading.Event
以及 DataPublisher
对象在工作线程之间共享的事实。
我还在两个示例中添加了增加线程池大小所需的线程池配置。默认值为 10。
作为测试我刚才所说的方法,您可以执行此 Python 3 脚本 (如果您没有 python3 现在你有了安装它的借口) 给定线程池,它将或多或少同时执行 100 个请求。
测试脚本
import time
import urllib.request
import concurrent.futures
URL = 'http://localhost:8080/'
TIMEOUT = 60
DELAY = 0.05
MAX_WORKERS = 20
REQ_RANGE = range(1, 101)
def load_url():
with urllib.request.urlopen(URL, timeout=TIMEOUT) as conn:
return conn.read()
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {}
for i in REQ_RANGE:
print("Sending req {}".format(i))
futures[executor.submit(load_url)] = i
time.sleep(DELAY)
results = []
for future in concurrent.futures.as_completed(futures):
try:
data = future.result().decode()
except Exception as exc:
print(exc)
else:
results.append((futures[future], data))
curr_max = 0
for i, data in sorted(results, key=lambda r: r[0]):
new_max = int(data.split()[-1])
assert new_max >= curr_max, "The data was not updated correctly"
print("Req {}: {}".format(i, data))
curr_max = new_max
你根据日志判断问题的方式,这种问题不可信。特别是考虑到您无法控制请求记录在 "access" 日志中的时间。我不能用我的测试代码让它使你的代码失败,但在一般情况下确实存在竞争条件,在这个例子中它应该一直工作,因为代码只是在制作 atomic operation。仅从中心点定期分配一个属性。
我希望代码是不言自明的,以防您有问题发表评论。
编辑:我编辑了基于插件的方法,因为它只起作用,因为如果您创建另一个更新数据的后台任务,那么只有 一个 地方正在执行插件当您做的不仅仅是一项任务时,可能会遇到问题。如果您将从 one BackgroundTask
更新,则无论代码是否是您正在寻找的代码。
我正在服务器中使用 cherrypy,该服务器实现了 RESTful,例如 API。 这些响应意味着一些繁重的计算,大约需要 2 秒 要求。为了进行此计算,使用了一些更新了三个的数据 一天几次。
数据在后台更新(大约需要半小时), 一旦更新,新数据的引用就会传递给 响应请求的函数。这只需一毫秒。
我需要的是确保每个请求都得到响应 旧数据或新数据,但 none 可以在更改数据引用时进行请求处理。理想情况下,我想找到一种在数据引用更改时缓冲传入请求的方法,并确保在所有进程中请求完成后更改引用。
我当前(未)工作的最小示例如下:
import time
import cherrypy
from cherrypy.process import plugins
theData = 0
def processData():
"""Backround task works for half hour three times a day,
and when finishes it publish it in the engine buffer."""
global theData # using global variables to simplify the example
theData += 1
cherrypy.engine.publish("doChangeData", theData)
class DataPublisher(object):
def __init__(self):
self.data = 'initData'
cherrypy.engine.subscribe('doChangeData', self.changeData)
def changeData(self, newData):
cherrypy.engine.log("Changing data, buffering should start!")
self.data = newData
time.sleep(1) #exageration of the 1 milisec of the references update to visualize the problem
cherrypy.engine.log("Continue serving buffered and new requests.")
@cherrypy.expose
def index(self):
result = "I get "+str(self.data)
cherrypy.engine.log(result)
time.sleep(3)
return result
if __name__ == '__main__':
conf = {
'/': { 'server.socket_host': '127.0.0.1',
'server.socket_port': 8080}
}
cherrypy.config.update(conf)
btask = plugins.BackgroundTask(5, processData) #5 secs for the example
btask.start()
cherrypy.quickstart(DataPublisher())
如果我运行这个脚本,也打开浏览器,输入localhost:8080并刷新 页面很多,我得到:
...
[17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start!
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "...
[17/Sep/2015:21:32:42] ENGINE I get 3
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...
这意味着一些请求处理在之前开始并在之后结束 数据引用开始或结束被更改。我想避免这两种情况。 类似于:
...
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "...
[17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start!
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
[17/Sep/2015:21:32:42] ENGINE I get 3
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...
我搜索了文档和网络,发现这些参考资料并未完全涵盖这种情况:
http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html
How to execute asynchronous post-processing in CherryPy?
http://tools.cherrypy.org/wiki/BackgroundTaskQueue
Cherrypy : which solutions for pages with large processing time
How to stop request processing in Cherrypy?
更新(有一个简单的解决方案):
经过深思熟虑,我认为这个问题具有误导性,因为它在问题本身中包含了一些实现要求,即:停止处理并开始缓冲。而对于这个问题,要求可以简化为:确保每个请求都是用旧数据或新数据处理的。
对于后者,存储所用数据的临时本地引用就足够了。这个引用在所有的请求处理中都可以使用,换个线程改也没问题self.data
。对于 python 个对象,垃圾收集器将处理旧数据。
具体来说,将索引函数改成:
@cherrypy.expose
def index(self):
tempData = self.data
result = "I started with %s"%str(tempData)
time.sleep(3) # Heavy use of tempData
result += " that changed to %s"%str(self.data)
result += " but I am still using %s"%str(tempData)
cherrypy.engine.log(result)
return result
结果我们将看到:
[21/Sep/2015:10:06:00] ENGINE I started with 1 that changed to 2 but I am still using 1
我仍然想保留原始的(限制性更强的)问题和 cyraxjoe 答案,因为我发现这些解决方案非常有用。
我将解释 两种 一种解决问题的方法es。
第一个是基于插件的。
Plugin based 还是需要一种同步。它之所以有效,是因为只有一个 BackgroundTask
进行修改(也只是一个原子操作)。
import time
import threading
import cherrypy
from cherrypy.process import plugins
UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20
next_data = 1
class DataGateway(plugins.SimplePlugin):
def __init__(self, bus):
super(DataGateway, self).__init__(bus)
self.data = next_data
def start(self):
self.bus.log("Starting DataGateway")
self.bus.subscribe('dg:get', self._get_data)
self.bus.subscribe('dg:update', self._update_data)
self.bus.log("DataGateway has been started")
def stop(self):
self.bus.log("Stopping DataGateway")
self.bus.unsubscribe('dg:get', self._get_data)
self.bus.unsubscribe('dg:update', self._update_data)
self.bus.log("DataGateway has been stopped")
def _update_data(self, new_val):
self.bus.log("Changing data, buffering should start!")
self.data = new_val
time.sleep(UPDATE_DELAY)
self.bus.log("Continue serving buffered and new requests.")
def _get_data(self):
return self.data
def processData():
"""Backround task works for half hour three times a day,
and when finishes it publish it in the engine buffer."""
global next_data
cherrypy.engine.publish("dg:update", next_data)
next_data += 1
class DataPublisher(object):
@property
def data(self):
return cherrypy.engine.publish('dg:get').pop()
@cherrypy.expose
def index(self):
result = "I get " + str(self.data)
cherrypy.engine.log(result)
time.sleep(REQUEST_DELAY)
return result
if __name__ == '__main__':
conf = {
'global': {
'server.thread_pool': THREAD_POOL_SIZE,
'server.socket_host': '127.0.0.1',
'server.socket_port': 8080,
}
}
cherrypy.config.update(conf)
DataGateway(cherrypy.engine).subscribe()
plugins.BackgroundTask(UPDATE_DELAY, processData).start()
cherrypy.quickstart(DataPublisher())
在此版本中,同步来自于 读取和写入 操作都在 一切都抽象在插件上cherrypy.engine
线程上执行的事实。DataGateway
你刚刚操作发布到引擎中
第二种方法是使用 Event
一个 threading.Event
对象。这是一种更手动的方法,它的额外好处是它可能会更快,因为读取速度更快,因为它不在 cherrypy.engine
线程上执行。
threading.Event 基于(a.k.a。手动)
import time
import cherrypy
import threading
from cherrypy.process import plugins
UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20
next_data = 1
def processData():
"""Backround task works for half hour three times a day,
and when finishes it publish it in the engine buffer."""
global next_data
cherrypy.engine.publish("doChangeData", next_data)
next_data += 1
class DataPublisher(object):
def __init__(self):
self._data = next_data
self._data_readable = threading.Event()
cherrypy.engine.subscribe('doChangeData', self.changeData)
@property
def data(self):
if self._data_readable.is_set():
return self._data
else:
self._data_readable.wait()
return self.data
@data.setter
def data(self, value):
self._data_readable.clear()
time.sleep(UPDATE_DELAY)
self._data = value
self._data_readable.set()
def changeData(self, newData):
cherrypy.engine.log("Changing data, buffering should start!")
self.data = newData
cherrypy.engine.log("Continue serving buffered and new requests.")
@cherrypy.expose
def index(self):
result = "I get " + str(self.data)
cherrypy.engine.log(result)
time.sleep(REQUEST_DELAY)
return result
if __name__ == '__main__':
conf = {
'global': {
'server.thread_pool': THREAD_POOL_SIZE,
'server.socket_host': '127.0.0.1',
'server.socket_port': 8080,
}
}
cherrypy.config.update(conf)
plugins.BackgroundTask(UPDATE_INTERVAL, processData).start()
cherrypy.quickstart(DataPublisher())
我用 @property
装饰器添加了一些细节,但真正的要点在于 threading.Event
以及 DataPublisher
对象在工作线程之间共享的事实。
我还在两个示例中添加了增加线程池大小所需的线程池配置。默认值为 10。
作为测试我刚才所说的方法,您可以执行此 Python 3 脚本 (如果您没有 python3 现在你有了安装它的借口) 给定线程池,它将或多或少同时执行 100 个请求。
测试脚本
import time
import urllib.request
import concurrent.futures
URL = 'http://localhost:8080/'
TIMEOUT = 60
DELAY = 0.05
MAX_WORKERS = 20
REQ_RANGE = range(1, 101)
def load_url():
with urllib.request.urlopen(URL, timeout=TIMEOUT) as conn:
return conn.read()
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {}
for i in REQ_RANGE:
print("Sending req {}".format(i))
futures[executor.submit(load_url)] = i
time.sleep(DELAY)
results = []
for future in concurrent.futures.as_completed(futures):
try:
data = future.result().decode()
except Exception as exc:
print(exc)
else:
results.append((futures[future], data))
curr_max = 0
for i, data in sorted(results, key=lambda r: r[0]):
new_max = int(data.split()[-1])
assert new_max >= curr_max, "The data was not updated correctly"
print("Req {}: {}".format(i, data))
curr_max = new_max
你根据日志判断问题的方式,这种问题不可信。特别是考虑到您无法控制请求记录在 "access" 日志中的时间。我不能用我的测试代码让它使你的代码失败,但在一般情况下确实存在竞争条件,在这个例子中它应该一直工作,因为代码只是在制作 atomic operation。仅从中心点定期分配一个属性。
我希望代码是不言自明的,以防您有问题发表评论。
编辑:我编辑了基于插件的方法,因为它只起作用,因为如果您创建另一个更新数据的后台任务,那么只有 一个 地方正在执行插件当您做的不仅仅是一项任务时,可能会遇到问题。如果您将从 one BackgroundTask
更新,则无论代码是否是您正在寻找的代码。