结合 tornado gen.coroutine 和 joblib mem.cache 装饰器
Combine tornado gen.coroutine and joblib mem.cache decorators
假设有一个函数处理繁重的计算工作,我们希望在 Tornado 应用程序上下文中异步执行。此外,我们希望通过将结果存储到磁盘来懒惰地评估该函数,而不是为相同的参数重新运行该函数两次。
如果不缓存结果(记忆),可以执行以下操作:
def complex_computation(arguments):
...
return result
@gen.coroutine
def complex_computation_caller(arguments):
...
result = complex_computation(arguments)
raise gen.Return(result)
假设要实现函数记忆化,我们从joblib中选择Memoryclass。通过简单地用 @mem.cache
装饰函数,函数可以很容易地被记忆:
@mem.cache
def complex_computation(arguments):
...
return result
其中 mem
可以是 mem = Memory(cachedir=get_cache_dir())
.
现在考虑将两者结合起来,我们在执行器上执行计算复杂的函数:
class TaskRunner(object):
def __init__(self, loop=None, number_of_workers=1):
self.executor = futures.ThreadPoolExecutor(number_of_workers)
self.loop = loop or IOLoop.instance()
@run_on_executor
def run(self, func, *args, **kwargs):
return func(*args, **kwargs)
mem = Memory(cachedir=get_cache_dir())
_runner = TaskRunner(1)
@mem.cache
def complex_computation(arguments):
...
return result
@gen.coroutine
def complex_computation_caller(arguments):
result = yield _runner.run(complex_computation, arguments)
...
raise gen.Return(result)
那么第一个问题就是上述方法在技术上是否正确?
现在让我们考虑以下场景:
@gen.coroutine
def first_coroutine(arguments):
...
result = yield second_coroutine(arguments)
raise gen.Return(result)
@gen.coroutine
def second_coroutine(arguments):
...
result = yield third_coroutine(arguments)
raise gen.Return(result)
第二个问题是如何记忆second_coroutine
?做这样的事情是否正确:
@gen.coroutine
def first_coroutine(arguments):
...
mem = Memory(cachedir=get_cache_dir())
mem_second_coroutine = mem(second_coroutine)
result = yield mem_second_coroutine(arguments)
raise gen.Return(result)
@gen.coroutine
def second_coroutine(arguments):
...
result = yield third_coroutine(arguments)
raise gen.Return(result)
[UPDATE I] 讨论使用 functools.lru_cache
或 repoze.lru.lru_cache
作为第二个问题的解决方案。
Tornado 协程返回的 Future
对象是可重用的,因此使用 functools.lru_cache
等内存缓存通常是可行的,如 中所述。请务必将缓存装饰器放在 @gen.coroutine
.
之前
磁盘缓存(这似乎由 Memory
的 cachedir
参数暗示)更棘手,因为 Future
对象通常不能写入磁盘。您的 TaskRunner
示例应该可以工作,但它做的事情与其他示例根本不同,因为 complex_calculation
不是协程。您的最后一个示例将不起作用,因为它试图将 Future
对象放入缓存中。
相反,如果你想用装饰器缓存东西,你需要一个用第二个协程包装内部协程的装饰器。像这样:
def cached_coroutine(f):
@gen.coroutine
def wrapped(*args):
if args in cache:
return cache[args]
result = yield f(*args)
cache[args] = f
return result
return wrapped
假设有一个函数处理繁重的计算工作,我们希望在 Tornado 应用程序上下文中异步执行。此外,我们希望通过将结果存储到磁盘来懒惰地评估该函数,而不是为相同的参数重新运行该函数两次。
如果不缓存结果(记忆),可以执行以下操作:
def complex_computation(arguments):
...
return result
@gen.coroutine
def complex_computation_caller(arguments):
...
result = complex_computation(arguments)
raise gen.Return(result)
假设要实现函数记忆化,我们从joblib中选择Memoryclass。通过简单地用 @mem.cache
装饰函数,函数可以很容易地被记忆:
@mem.cache
def complex_computation(arguments):
...
return result
其中 mem
可以是 mem = Memory(cachedir=get_cache_dir())
.
现在考虑将两者结合起来,我们在执行器上执行计算复杂的函数:
class TaskRunner(object):
def __init__(self, loop=None, number_of_workers=1):
self.executor = futures.ThreadPoolExecutor(number_of_workers)
self.loop = loop or IOLoop.instance()
@run_on_executor
def run(self, func, *args, **kwargs):
return func(*args, **kwargs)
mem = Memory(cachedir=get_cache_dir())
_runner = TaskRunner(1)
@mem.cache
def complex_computation(arguments):
...
return result
@gen.coroutine
def complex_computation_caller(arguments):
result = yield _runner.run(complex_computation, arguments)
...
raise gen.Return(result)
那么第一个问题就是上述方法在技术上是否正确?
现在让我们考虑以下场景:
@gen.coroutine
def first_coroutine(arguments):
...
result = yield second_coroutine(arguments)
raise gen.Return(result)
@gen.coroutine
def second_coroutine(arguments):
...
result = yield third_coroutine(arguments)
raise gen.Return(result)
第二个问题是如何记忆second_coroutine
?做这样的事情是否正确:
@gen.coroutine
def first_coroutine(arguments):
...
mem = Memory(cachedir=get_cache_dir())
mem_second_coroutine = mem(second_coroutine)
result = yield mem_second_coroutine(arguments)
raise gen.Return(result)
@gen.coroutine
def second_coroutine(arguments):
...
result = yield third_coroutine(arguments)
raise gen.Return(result)
[UPDATE I] functools.lru_cache
或 repoze.lru.lru_cache
作为第二个问题的解决方案。
Tornado 协程返回的 Future
对象是可重用的,因此使用 functools.lru_cache
等内存缓存通常是可行的,如 @gen.coroutine
.
磁盘缓存(这似乎由 Memory
的 cachedir
参数暗示)更棘手,因为 Future
对象通常不能写入磁盘。您的 TaskRunner
示例应该可以工作,但它做的事情与其他示例根本不同,因为 complex_calculation
不是协程。您的最后一个示例将不起作用,因为它试图将 Future
对象放入缓存中。
相反,如果你想用装饰器缓存东西,你需要一个用第二个协程包装内部协程的装饰器。像这样:
def cached_coroutine(f):
@gen.coroutine
def wrapped(*args):
if args in cache:
return cache[args]
result = yield f(*args)
cache[args] = f
return result
return wrapped