结合 tornado gen.coroutine 和 joblib mem.cache 装饰器

Combine tornado gen.coroutine and joblib mem.cache decorators

假设有一个函数处理繁重的计算工作,我们希望在 Tornado 应用程序上下文中异步执行。此外,我们希望通过将结果存储到磁盘来懒惰地评估该函数,而不是为相同的参数重新运行该函数两次。

如果不缓存结果(记忆),可以执行以下操作:

def complex_computation(arguments):
    ...
    return result

@gen.coroutine
def complex_computation_caller(arguments):
    ...
    result = complex_computation(arguments)
    raise gen.Return(result)

假设要实现函数记忆化,我们从joblib中选择Memoryclass。通过简单地用 @mem.cache 装饰函数,函数可以很容易地被记忆:

@mem.cache
def complex_computation(arguments):
    ...
    return result

其中 mem 可以是 mem = Memory(cachedir=get_cache_dir()).

现在考虑将两者结合起来,我们在执行器上执行计算复杂的函数:

class TaskRunner(object):
    def __init__(self, loop=None, number_of_workers=1):
        self.executor = futures.ThreadPoolExecutor(number_of_workers)
        self.loop = loop or IOLoop.instance()

    @run_on_executor
    def run(self, func, *args, **kwargs):
        return func(*args, **kwargs)

mem = Memory(cachedir=get_cache_dir())
_runner = TaskRunner(1)

@mem.cache
def complex_computation(arguments):
    ...
    return result

@gen.coroutine
def complex_computation_caller(arguments):
    result = yield _runner.run(complex_computation, arguments)
    ...
    raise gen.Return(result)

那么第一个问题就是上述方法在技术上是否正确?

现在让我们考虑以下场景:

@gen.coroutine
def first_coroutine(arguments):
    ...
    result = yield second_coroutine(arguments)
    raise gen.Return(result)

@gen.coroutine
def second_coroutine(arguments):
    ...
    result = yield third_coroutine(arguments)
    raise gen.Return(result)

第二个问题是如何记忆second_coroutine?做这样的事情是否正确:

@gen.coroutine
def first_coroutine(arguments):
    ...
    mem = Memory(cachedir=get_cache_dir())
    mem_second_coroutine = mem(second_coroutine)
    result = yield mem_second_coroutine(arguments)
    raise gen.Return(result)

@gen.coroutine
def second_coroutine(arguments):
    ...
    result = yield third_coroutine(arguments)
    raise gen.Return(result)

[UPDATE I] 讨论使用 functools.lru_cacherepoze.lru.lru_cache 作为第二个问题的解决方案。

Tornado 协程返回的 Future 对象是可重用的,因此使用 functools.lru_cache 等内存缓存通常是可行的,如 中所述。请务必将缓存装饰器放在 @gen.coroutine.

之前

磁盘缓存(这似乎由 Memorycachedir 参数暗示)更棘手,因为 Future 对象通常不能写入磁盘。您的 TaskRunner 示例应该可以工作,但它做的事情与其他示例根本不同,因为 complex_calculation 不是协程。您的最后一个示例将不起作用,因为它试图将 Future 对象放入缓存中。

相反,如果你想用装饰器缓存东西,你需要一个用第二个协程包装内部协程的装饰器。像这样:

def cached_coroutine(f):
    @gen.coroutine
    def wrapped(*args):
        if args in cache:
            return cache[args]
        result = yield f(*args)
        cache[args] = f
        return result
    return wrapped