共享只读内存和函数 python

Shared read only memory and functions python

我需要 运行 对输入列表进行并行处理,但在处理过程中使用上面代码中定义的所有变量和函数。但是这个过程本身可以并行化,因为它只依赖于一个变量,即列表的输入。 所以我有两种可能性,但我不知道如何实现这两种可能性:

1) 使用 class,并且有一个方法应该使用那个 class 的所有函数和属性进行并行化。即:运行 并行循环中的方法,但有机会读取对象的属性而不创建对象的副本。

2) 在 运行 并行化进程之前只需要一个大的 main 并定义全局变量。

例如:


from joblib import Parallel, delayed


def func(x,y,z):
    #do something 
    a = func0(x,y) #whatever function
    a = func1(a,z) #whatever function
    return a

if name==“__main__””:
   #a lot of stuff in which you create y and z
   global y,z
   result = Parallel(n_jobs=2)(delayed(func)(i,y,z)for i in range(10))

所以问题是,当我到达并行函数时,y 和 z 已经定义,它们只是查找数据,我的问题是如何将这些值传递给并行函数,而不 python 为每个作业创建一个副本?

如果您只需要将列表传递给一些并行进程,我会使用内置的线程模块。据我所知你的问题,这就是你所需要的,你可以将参数传递给线程。

这是一个基本的线程设置:

import threading

def func(x, y):
    print(x, y) # random example

x, y = "foo", "bar"

threads = []
for _ in range(10): # create 10 threads
    t = threading.Thread(target=func, args=(x, y,))
    threads.append(t)
    t.start()

for t in threads:
    t.join() # waits for the thread to complete

但是,如果您需要以线程安全的方式跟踪该列表,您将需要使用队列:

import threading, queue

# build a thread-safe list
my_q = queue.Queue()
for i in range(1000):
    my_q.put(i)

# here is your worker function
def worker(queue):
    while not queue.empty():
        task = queue.get() # get the next value from the queue
        print(task)
        queue.task_done() # when you are done tell the queue that this task is complete


# spin up some threads
threads = []
for _ in range(10):
    t = threading.Thread(target=worker, args=(my_q,))
    threads.append(t)
    t.start()

my_q.join() # joining the queue means your code will wait here until the queue is empty

现在要回答有关共享状态的问题,您可以创建一个对象来保存您的变量。通过这种方式,您可以传递对象本身,而不是将变量的副本传递给每个线程(我相信这称为 Borg,但我在这方面可能有点错误)。执行此操作时,如果您计划对共享变量进行任何更改,则会导入它以确保它们是线程安全的。例如,如果两个线程试图同时递增一个数字,您可能会丢失该更改,因为一个线程会覆盖另一个线程。为了防止这种情况,我们使用 threading.Lock 对象。 (如果你不关心这个,就忽略下面所有的锁)。

还有其他方法可以做到这一点,但我发现这种方法易于理解且非常灵活:

import threading

# worker function
def worker(vars, lock):
    with lock:
        vars.counter += 1
    print(f"{threading.current_thread().name}: counter = {vars.counter}")

# this holds your variables to be referenced by threads
class Vars(object):
    counter = 0
vars = Vars()

lock = threading.Lock()

# spin up some threads
threads = []
for _ in range(10):
    t = threading.Thread(target=worker, args=(vars, lock, ))
    threads.append(t)
    t.start()

for t in threads:
    t.join()