如何监控一系列功能的执行情况?也许是 RxPy?

How to monitor the execution of a series of functions? Maybe RxPy?

我有一个要执行的函数列表,就像在管道中一样。

pipeline = [f1, f2, f3, f4]

(基本上我写的函数就在列表里看:https://mathieularose.com/function-composition-in-python/):

def run(pipeline):
     return functools.reduce(lambda f, g: lambda x: f(g(x)), pipeline, lambda x: x)

我想观察函数何时被调用、何时完成、是否失败等,并将其记录到文件或数据库或 MongoDB 等。每个函数都是 returning 一些 python 对象,所以我想使用 return 值并记录它的属性,例如

如果f1return是list我要登录f1 was completed at 23:00 on 04/22/2018. It returned a list of length 5等等

我的问题不在于执行函数,而是关于观察函数的行为。我希望这些函数与我如何编写管道代码无关。

我想知道这里如何实现观察者模式。我知道 "observer pattern" 听起来太过分了 "Object Oriented",所以我的第一个想法是使用装饰器,但在搜索这方面的指南时我发现了 RxPy.

所以,我正在寻找有关如何解决此问题的指南。

类似于:

import time

def run(pipeline):
    def composed(x):
        for f in pipeline:
            y = f(x)
            print('{} completed on {}. it returned {}'.format(f, time.time(), y))
            x = y
        return x
    return composed

观察者模式不适合函数式风格。

编辑:基于回调:

def run(pipeline, callback):
    def composed(x):
        for f in pipeline:
            y = f(x)
            callback(f, time.time(), y)
            x = y
        return x
    return composed

如果你喜欢装饰器,可以使用以下方法:

def log_finish_time(f):
    def g(*args):
        ret = f(*args)
        print('{} completed on {}'.format(f, time.time()))
        return ret
    return g

def log_return_value(f):
    def g(*args):
        ret = f(*args)
        print('{} returned {}'.format(f, ret))
        return ret
    return g

@log_finish_time
@log_return_value
def f1(x):
    return x+1

@log_finish_time
@log_return_value
def f2(x):
    return x*x

无需更改您的合成代码。

如果你想为你的装饰器提供参数,你必须在中间添加另一个函数(基本上它是一个 returns 装饰器的函数):

def log_finish_time(log_prefix):
    def h(f):
        def g(*args):
            ret = f(*args)
            print('{}: {} completed on {}'.format(log_prefix, f, time.time()))
            return ret
        return g
    return h

@log_finish_time('A')
def f1(x):
    return x+1