解决了在计算多处理期间调用函数的频率时 returns 负值的问题

Fixing the issue where upon calculating how frequently a function is called during multiprocessing it returns a negative value

我有一个函数 foo() 可以被多个工作进程同时访问。此函数阻塞,直到输出准备就绪,然后 returns 它。下面是示例 foo

import random
from time import sleep

def foo():
    
    # Mimic blocking of function
    sleep(random.randint(1, 3))
    
    output = 'result of some logic'
    return output

我需要计算调用此函数的频率(速率)(例如每 15 秒一次)。但是,我不希望这个计算出的速率包括在实际函数中花费的时间(因为 foo 可能会阻塞很长时间)。为了只用 1 个工人来做这件事,我 运行 这个:

import random
import time
from time import sleep

call_rate = {'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0}

def foo():
    global call_rate
    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'
    time_waited = time.time() - enter_time

    # Add the time since last function call, and remove time spent inside the function
    call_rate['total_time'] += time.time() - call_rate['last_call'] - time_waited
    call_rate['last_call'] = time.time()
    call_rate['total_calls'] += 1

    # calculate rate
    call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    return output

def worker(num):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo()

worker(3)

# Output: 1.005s . As expected since worker waits 1s before each call
print('foo called once every {}s'.format(call_rate['rate']))  

基本上,我计算了连续调用之间的总时间差,减去函数内花费的时间后,除以调用总数 (rate = total_time / total_calls)

但是当我 运行 这与多个工人一起时,输出是负数:

import random
import time
from time import sleep
from multiprocessing import Manager, Process


def foo(call_rate):

    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'
    time_waited = time.time() - enter_time

    # Add the time since last function call, and remove time spent inside the function
    call_rate['total_time'] += time.time() - call_rate['last_call'] - time_waited
    call_rate['last_call'] = time.time()
    call_rate['total_calls'] += 1

    # calculate rate
    call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    return output

def worker(num, call_rate):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate)

if __name__ == '__main__':
    # Create a shared dictionary accessible by all processes
    m = Manager()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})
    
    w = []
    
    # Create 3 worker processes that run foo() thrice 
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate,)))
        w[i].start()
    for i in range(3):
        w[i].join()
        
    # Output: -0.97s 
    print('foo called once every {}s'.format(call_rate['rate'])) 

我有点理解为什么输出是负数。因为现在有多个进程,每个连续函数调用之间的时间差变得越来越小,减去一个进程的函数所花费的时间现在没有多大意义,因为函数调用现在可以来自不同的进程。所以我的问题是,在不知道工人数量 运行ning?

的情况下,如何在第二种情况下获得大约 0.3s 的输出(因为有 3 个工人同时调用该方法并延迟 1s)

免责声明 我已经在 here. However, before posting this question, I read the meta discussions here and here 之前问过(一个相当粗略的变体)这个问题。我认为这个问题与我之前的问题不重复的原因是因为它关注的是一个更小、解释得更好的问题,而不是我原来的问题,这个问题范围更广,而且未能清楚地解释自己。我当时的目标不仅是为这个问题寻找答案,而且是在我更广泛的方法本身中寻找替代方案,这导致它变得模糊和神秘。与以前不同,我给出了专注于单个明确问题的可重现代码,并且这个问题作为一个整体有更多有用的应用程序。

更新

你可能应该确保 foo 正在更新 Lock 实例下的 call_rate 字典以处理并发访问,因为你是 运行 多个过程。但真正的问题是值last_call需要为每个进程维护,不能在进程之间共享。

此解决方案使用托管 class、WorkerManager,只要为传递其进程 ID 的每个进程调用方法 init_process,它就能够跟踪所有创建的进程启动后立即,如下面的代码所示。然后,辅助函数所要做的就是调用方法 update_statistics 传递它正在处理的每个请求的等待时间。调用 get_statistics 将 return 统计信息。

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock, current_process
from multiprocessing.managers import BaseManager

class WorkerManager:
    def __init__(self):
        self._total_calls = 0
        self._total_time = 0.0
        self._rate = 0.0
        self._lock = Lock()
        self._call_times = {}

    def init_process(self, pid):
        self._call_times[pid] = time.time()

    def update_statistics(self, pid, wait_time):
        now = time.time()
        time_elapsed = now - self._call_times[pid]
        execution_time = time_elapsed - wait_time
        self._call_times[pid] = now
        with self._lock:
            self._total_calls += 1
            self._total_time += execution_time
            self._rate = self._total_time / (self._total_calls * len(self._call_times))

    def get_statistics(self):
        return {'rate': self._rate, 'total_time': self._total_time, 'total_calls': self._total_calls}

class WorkerManagerManager(BaseManager):
    pass

WorkerManagerManager.register('WorkerManager', WorkerManager)


def foo(worker_manager):
    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'

    wait_time = time.time() - enter_time

    pid = current_process().pid
    worker_manager.update_statistics(pid, wait_time)

    return output

def worker(worker_manager, num):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(worker_manager)

if __name__ == '__main__':
    with WorkerManagerManager() as m:
        worker_manager = m.WorkerManager()
        processes = [Process(target=worker, args=(worker_manager, 3)) for _ in range(3)]
        for p in processes:
            p.start()
            worker_manager.init_process(p.pid)
        for p in processes:
            p.join()
        statistics = worker_manager.get_statistics()
        print('foo called once every {}s'.format(statistics['rate']))

打印:

foo called once every 0.34751895621970846s

如何使用进程池

如果您想使用进程池,这就是您可以使用大小为 3 的池来提交 6 个任务的方式:

import random
import time
from time import sleep
from multiprocessing import Manager, Pool, Lock, current_process
from multiprocessing.managers import BaseManager
from functools import partial

class WorkerManager:
    def __init__(self):
        self._total_calls = 0
        self._total_time = 0.0
        self._rate = 0.0
        self._lock = Lock()
        self._call_times = {}

    def init_process(self, pid):
        self._call_times[pid] = time.time()

    def update_statistics(self, pid, wait_time):
        now = time.time()
        time_elapsed = now - self._call_times[pid]
        execution_time = time_elapsed - wait_time
        self._call_times[pid] = now
        with self._lock:
            self._total_calls += 1
            self._total_time += execution_time
            self._rate = self._total_time / (self._total_calls * len(self._call_times))

    def get_statistics(self):
        return {'rate': self._rate, 'total_time': self._total_time, 'total_calls': self._total_calls}

class WorkerManagerManager(BaseManager):
    pass

WorkerManagerManager.register('WorkerManager', WorkerManager)


def pool_init(worker_manager):
    worker_manager.init_process(current_process().pid)

def foo(worker_manager):
    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'

    wait_time = time.time() - enter_time

    pid = current_process().pid
    worker_manager.update_statistics(pid, wait_time)

    return output

def worker(worker_manager, num):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(worker_manager)

if __name__ == '__main__':
    with WorkerManagerManager() as m:
        worker_manager = m.WorkerManager()
        pool = Pool(3, initializer=pool_init, initargs=(worker_manager,))
        # run 6 tasks
        pool.map(partial(worker, worker_manager), range(6))
        statistics = worker_manager.get_statistics()
        print('foo called once every {}s'.format(statistics['rate']))

打印:

foo called once every 0.333592324786716s

我找到了一个不用问worker数量的方法运行ning:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock

def foo(call_rate, lock):
    # Shift this to the start of the function
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'


    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
    return output

def worker(num, call_rate, lock):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

if __name__ == '__main__':
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice 
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i in range(3):
        w[i].join()

    # Output: 0.354s 
    print('foo called once every {}s'.format(call_rate['rate']))

我将解释为什么这有效。在原始代码中,最后一次调用时间是在函数阻塞之后记录的。这意味着需要减去在函数中花费的时间。但是,正如@Booboo 已经在对他们的回答的评论中指出的那样,这是有问题的,因为可能有多个工人 运行ning 我们不能只减去每个工人在函数中花费的等待时间。

一个简单的解决方法是在函数开始时记录最后一次调用时间,其中尚未添加在函数中花费的时间。但它仍然没有解决更广泛的问题,因为下一次从 worker 调用 foo() 时,它将包括上次调用在函数内花费的时间,让我们再次回到原点。但是这个,我不知道为什么我以前没有看到这个,可以很简单地修复;通过在函数退出之前添加这一行:

call_rate['last_call'] = time.time()

这可以确保当函数退出时,最后一次调用被刷新,这样工作人员似乎根本没有在函数中花费任何时间。这种方法不需要减去任何东西,这就是它起作用的原因。

我进行了 10 次 运行 测试,并使用以下代码计算了一些统计数据:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics


def foo(call_rate, lock):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    # Mimic blocking of function
    sleep(2)

    output = 'result of some logic'


    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
    return output

def worker(num, call_rate, lock):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

def main():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i in range(3):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i in range(10):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

这输出:

Highest is : 0.35980285538567436
Lowest is : 0.3536567423078749
Avergae is : 0.356808172331916

作为'proof',上面的代码确实忽略了函数内花费的时间,你可以把函数块做大一点的时间,比如说15s,输出还是差不多的。

更新

函数阻塞时间不定的频率不是0.3s的原因与工人进出的时间有关foo()。考虑下面的代码,其中两个工作人员 运行 执行一次 foo() 两次并在每次进入和退出 foo() 时输出 call_rate 以及用于标识工作人员的唯一 ID:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics
import string

def foo(call_rate, lock, id):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
        print("{} entered, call rate {}".format(id, call_rate))
    # Mimic blocking of function
    sleep(1)

    output = 'result of some logic'

    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
        print("{} exited, call rate {}".format(id, call_rate))
    return output


def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


def worker(num, call_rate, lock):
    id = id_generator()
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock, id)

def main():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice
    for i in range(2):
        w.append(Process(target=worker, args=(2, call_rate, lock, )))
        w[i].start()
    for i in range(2):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i in range(1):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

请注意,在此代码中,foo() 始终阻塞 1 秒。由于有两名工人在场,该速率应接近 0.5 秒。 运行 此代码:

输出#1:

XEC6AU entered, call rate {'rate': 1.1851444244384766, 'total_time': 1.1851444244384766, 'last_call': 1624950732.381014, 'total_calls': 1}
O43FUI entered, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950732.4325447, 'total_calls': 2}
XEC6AU exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4327667, 'total_calls': 2}
O43FUI exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4484024, 'total_calls': 2}
XEC6AU entered, call rate {'rate': 0.7401185035705566, 'total_time': 2.22035551071167, 'last_call': 1624950734.433083, 'total_calls': 3}
O43FUI entered, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950734.4487064, 'total_calls': 4}
XEC6AU exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4333804, 'total_calls': 4}
O43FUI exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4958992, 'total_calls': 4}
Highest is : 0.558994710445404
Lowest is : 0.558994710445404
Avergae is : 0.558994710445404

速率为0.5s,应该是意料之中的。请注意两个工作人员如何同时进入和退出功能。现在将函数阻塞时间从 1s 更改为 random.randint(1, 10),这就是我得到的:

输出 #2

NHXAKF entered, call rate {'rate': 1.1722326278686523, 'total_time': 1.1722326278686523, 'last_call': 1624886294.4630196, 'total_calls': 1}
R2DD8H entered, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886294.478649, 'total_calls': 2}
NHXAKF exited, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886300.4648588, 'total_calls': 2}
NHXAKF entered, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886301.465171, 'total_calls': 3}
R2DD8H exited, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886302.4811018, 'total_calls': 3}
R2DD8H entered, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886303.4813821, 'total_calls': 4}
NHXAKF exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886304.4660738, 'total_calls': 4}
R2DD8H exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886307.4826, 'total_calls': 4}
Highest is : 0.7971136569976807
Lowest is : 0.7971136569976807
Avergae is : 0.7971136569976807

与以前不同的是,这个比率几乎是 0.8。此外,两名工人也不再一起进出该功能。这当然是由于一个阻塞的时间比另一个阻塞的时间长。但是因为它们不再同步,所以它们在不同的时间等待 1,而不是在 worker() 函数内一起等待。您甚至可以在 call_rate['total_time'] 中看到它。对于输出#1,工作人员同步,它是~2s,而对于输出#2,它是~3s。因此,费率存在差异。所以 0.8s 是在这种情况下调用 foo() 的工作人员的真实速率,而不是假设的 0.5s。将速率乘以进程数会忽略这种细微差别。