与线程共享内存

Memory Shared with threading

我在尝试 link 线程内存时遇到问题。我希望计数器在所有线程之间共享内存,所有线程都只计数到某个数字(在本例中为 100),最后它返回到主线程。问题是即使锁定所有线程也只有一个计数

import threading
from threading import Thread, Lock
import time
import multiprocessing
import random

def create_workers(n_threads, counter):
    # counter = 0
    workers = []
    for n in range(n_threads):
        worker = DataCampThread('Thread - ' + str(n), counter)
        workers.append(worker)

    for worker in workers:
        worker.start()

    for worker in workers:
        worker.join()

    return counter

def thread_delay(thread_name, num, delay):
    num += 1
    time.sleep(delay)
    print(thread_name, '-------->', num)
    return num

class DataCampThread(Thread):
    def __init__(self, name, cou):
        Thread.__init__(self)
        self.name = name
        self.counter = cou
        delay = random.randint(1, 2)
        self.delay = delay
        self.lock = Lock()

    def run(self):
        print('Starting Thread:', self.name)
        while self.counter < 100:
            self.lock.acquire()
            self.counter = thread_delay(self.name, self.counter, self.delay)
            self.lock.release()
        print('Execution of Thread:', self.name, 'is complete!')

if __name__ == '__main__':
    # create the agent
    n_threads = 3#multiprocessing.cpu_count()
    counter = 0
    create_workers(n_threads, counter)
    print(counter)
    print("Thread execution is complete!")

正如我在评论中提到的,我不太确定你想做什么——但这是一个不知情的猜测(希望)加快速度。

根据您对我关于希望避免使用全局变量的回答的初始版本的回应,计数器现在是一个 class 属性,将自动由 class 的所有实例共享.每个线程都有自己的名称和随机选择的时间量,它在更新名为 counter.

的共享 class 属性之间延迟

注意:测试代码重新定义了print()函数,防止它被多个线程同时使用

import threading
from threading import Thread, Lock
import time
import random

MAXVAL = 10


class DataCampThread(Thread):

    counter = 0  # Class attribute.
    counter_lock = Lock()  # Control concurrent access to shared class attribute.

    def __init__(self, name):
        super().__init__()  # Initialize base class.
        self.name = name
        self.delay = random.randint(1, 2)

    def run(self):
        print('Starting Thread:', self.name)
        while True:
            with self.counter_lock:
                if self.counter >= MAXVAL:
                    break  # Exit while loop (also releases lock).
#                self.counter += 1  # DON'T USE - would create an instance-level attribute.
                type(self).counter += 1  # Update class attribute.
                print(self.name, '-------->', self.counter)
            time.sleep(self.delay)
        print('Execution of Thread:', self.name, 'is complete!')


def main(n_threads, maxval):
    ''' Create and start worker threads, then wait for them all to finish. '''

    workers = [DataCampThread(name=f'Thread #{i}')  for i in range(n_threads)]

    for worker in workers:
        worker.start()

    # Wait for all treads to finish.
    for worker in workers:
        worker.join()


if __name__ == '__main__':

    import builtins

    def print(*args, **kwargs):
        ''' Redefine print to prevent concurrent printing. '''
        with print.lock:
            builtins.print(*args, **kwargs)

    print.lock = Lock()  # Function attribute.

    n_threads = 3
    main(n_threads, MAXVAL)
    print()
    print('Thread execution is complete!')
    print('final counter value:', DataCampThread.counter)

示例输出:

Starting Thread: Thread #0
Starting Thread: Thread #1
Thread #0 --------> 1
Starting Thread: Thread #2
Thread #1 --------> 2
Thread #2 --------> 3
Thread #1 --------> 4
Thread #0 --------> 5
Thread #2 --------> 6
Thread #2 --------> 7
Thread #1 --------> 8
Thread #0 --------> 9
Thread #2 --------> 10
Execution of Thread: Thread #1 is complete!
Execution of Thread: Thread #0 is complete!
Execution of Thread: Thread #2 is complete!

Thread execution is complete!
final counter value: 10