使用线程锁定在 FIFO 队列中执行函数

Execute Functions in a FIFO queue with Thread Locking

我想在 FIFO 队列中执行函数调用并从每次调用中获取结果。

def func1(arg1):
    return arg1
def func2(arg1):
    return arg1

我会随机从不同线程调用这些函数。所以基本上我想要的是一次只执行其中一个函数,我希望能够从每次调用中获取 return 值。

尝试这样的事情:

import threading
import time


def func1(arg1):
    return arg1

def func2(arg1):
    return arg1

def square(x):
    time.sleep(5)
    return x * x

one_at_a_time = threading.Lock()
action_to_func_mapping = {
    "action1": func1,
    "action2": func2,
    "square_number": square,
}

def dispatch(action, arg1):
    one_at_a_time.acquire()
    func_to_call = action_to_func_mapping[action]
    func_result = func_to_call(arg1)
    one_at_a_time.release()
    return func_result

这是一个例子:

if __name__ == '__main__':
    square_result = dispatch('square_number', 2)
    print(square_result)
    func1_result = dispatch('action1', 3)
    print(func1_result)


>>> python dispatch.py
4
3

你在这里看不到锁的效果:这个例子是单线程的,线程之间永远不会有任何争用。您可以使用一些简单的线程代码来更新问题,以显示正在运行的锁。

在多线程上下文中,上述代码将阻塞其他线程并等待第一个线程完成其功能的执行。在您的应用程序中,您可以做一些更有效率的事情。例如,您可能每个函数都有一个锁,而不是一次一个用于所有函数调用的漏斗:

action_to_func_mapping = {
    "action1": func1,
    "action2": func2,
    "square_number": square,
}
action_to_func_with_perfunc_lock = {action: (func, threading.Lock())
     for action, func in action_to_func_mapping.items()}


def dispatch(action, arg1):
    func_to_call, func_lock = action_to_func_with_perfunc_lock[action]
    func_lock.acquire()
    func_result = func_to_call(arg1)
    func_lock.release()
    return func_result

>>> python dispatch.py
4
3

这次 square_number 将阻塞下一个 square_number 直到第一个完成,但是 action1action2 不会被 square_number 阻塞称呼。

这里使用了一些Python工具:dict comprehensions, tuples,元组解包。

这是一个完整的示例,其中包含一个 FIFO 队列和线程,对比每个函数的锁和一次一个锁。前面的答案是正确的,但不能证明锁定策略及其效果,因为那里的代码是单线程的。现在的答案引入了线程,所以确实发生了线程锁定,你可以看到结果。

import threading
from collections import deque
from time import sleep, time


def duck(behaviour):
    sleep(1)
    return behaviour


def sloth(behaviour):
    sleep(5)
    return behaviour


def swift(behaviour):
    sleep(0.1)
    return behaviour


animal_to_func = {
    "sloth": sloth,
    "duck": duck,
    "swift": swift,
}
one_at_a_time_lock = threading.Lock()
animal_to_func_with_per_animal_lock = {action: (func, threading.Lock())
                                       for action, func in animal_to_func.items()}

fifo = deque()


class AnimalThread(threading.Thread):
    def __init__(self, thread_id, use_per_animal_lock=False):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.use_per_animal_lock = use_per_animal_lock

    def run(self):
        while len(fifo):
            animal, behaviour = fifo.popleft()
            animal_action = dispatch(animal, behaviour, self.use_per_animal_lock)
            print('  (thread %s) %s: %s' % (self.thread_id, animal, str(animal_action)))


def dispatch(animal, behaviour, use_per_animal_lock=False):
    if use_per_animal_lock:
        animal_function, lock = animal_to_func_with_per_animal_lock[animal]
    else:
        lock = one_at_a_time_lock
        animal_function = animal_to_func[animal]
    lock.acquire()
    what_just_happened = animal_function(behaviour)
    lock.release()
    return what_just_happened


if __name__ == '__main__':
    monday_morning = [
        ('sloth', 'wake up'),
        ('sloth', 'blink'),
        ('sloth', 'go back to sleep'),
        ('duck', 'wake up'),
        ('duck', 'quack'),
        ('duck', 'waddle'),
        ('duck', 'swim'),
        ('duck', 'fight duck #2'),
        ('duck', 'quack'),
        ('swift', 'wake up'),
        ('swift', 'catch insects'),
        ('swift', 'feed chicks'),
    ]
    # essentially unlimited threads to force locks to be used
    no_of_threads = len(monday_morning)
    print('One at a time, no pushing and shoving!...')
    # load the FIFO queue the threads will consume
    fifo.extend(monday_morning)
    # start threads
    threads = [AnimalThread(i) for i in range(no_of_threads)]
    [thread.start() for thread in threads]
    # wait for threads to finish
    [thread.join() for thread in threads]
    print('One of each kind of animal at a time...')
    # load the FIFO queue the threads will consume
    fifo.extend(monday_morning)
    # start threads
    threads = [AnimalThread(threadno, use_per_animal_lock=True) for threadno in range(no_of_threads)]
    [thread.start() for thread in threads]
    # wait for threads to finish
    [thread.join() for thread in threads]

这是一个例子运行:

 One at a time, no pushing and shoving!...
   (thread 0) sloth: wake up
   (thread 1) sloth: blink
   (thread 2) sloth: go back to sleep
   (thread 3) duck: wake up
   (thread 4) duck: quack
   (thread 5) duck: waddle
   (thread 6) duck: swim
   (thread 7) duck: fight duck #2
   (thread 8) duck: quack
   (thread 9) swift: wake up
   (thread 10) swift: catch insects
   (thread 11) swift: feed chicks
 One of each kind of animal at a time...
   (thread 9) swift: wake up
   (thread 10) swift: catch insects
   (thread 11) swift: feed chicks
   (thread 3) duck: wake up
   (thread 4) duck: quack
   (thread 5) duck: waddle
   (thread 6) duck: swim
   (thread 0) sloth: wake up
   (thread 7) duck: fight duck #2
   (thread 8) duck: quack
   (thread 1) sloth: blink
   (thread 2) sloth: go back to sleep

一次一个锁遵循 FIFO 队列中项目的顺序。这是一个漏斗,一次只允许调用一个函数。它的问题在于它有效地将多线程应用程序转换为串行的单线程应用程序。

当上面的代码中有per-animal lock时,动物们一次只能做一件事(树懒不能同时醒来和眨眼)但是它们是相互独立的并且可以继续自己的生活。 swift 在鸭子和树懒醒来之前完成。但是因为鸭子早上很忙,一次只能做一件事,树懒在鸭子做完之前就醒了。

这种锁定在时间敏感的应用程序中很有用,在这些应用程序中,特定类型的函数需要按照它们到达队列的顺序进行处理。金钱交易就是一个例子,您可能有多个工作单元与一个帐户相关,并且必须按照它们到达队列的顺序进行处理。