共享只读内存和函数 python
Shared read only memory and functions python
我需要 运行 对输入列表进行并行处理,但在处理过程中使用上面代码中定义的所有变量和函数。但是这个过程本身可以并行化,因为它只依赖于一个变量,即列表的输入。
所以我有两种可能性,但我不知道如何实现这两种可能性:
1) 使用 class,并且有一个方法应该使用那个 class 的所有函数和属性进行并行化。即:运行 并行循环中的方法,但有机会读取对象的属性而不创建对象的副本。
2) 在 运行 并行化进程之前只需要一个大的 main 并定义全局变量。
例如:
from joblib import Parallel, delayed
def func(x,y,z):
#do something
a = func0(x,y) #whatever function
a = func1(a,z) #whatever function
return a
if name==“__main__””:
#a lot of stuff in which you create y and z
global y,z
result = Parallel(n_jobs=2)(delayed(func)(i,y,z)for i in range(10))
所以问题是,当我到达并行函数时,y 和 z 已经定义,它们只是查找数据,我的问题是如何将这些值传递给并行函数,而不 python 为每个作业创建一个副本?
如果您只需要将列表传递给一些并行进程,我会使用内置的线程模块。据我所知你的问题,这就是你所需要的,你可以将参数传递给线程。
这是一个基本的线程设置:
import threading
def func(x, y):
print(x, y) # random example
x, y = "foo", "bar"
threads = []
for _ in range(10): # create 10 threads
t = threading.Thread(target=func, args=(x, y,))
threads.append(t)
t.start()
for t in threads:
t.join() # waits for the thread to complete
但是,如果您需要以线程安全的方式跟踪该列表,您将需要使用队列:
import threading, queue
# build a thread-safe list
my_q = queue.Queue()
for i in range(1000):
my_q.put(i)
# here is your worker function
def worker(queue):
while not queue.empty():
task = queue.get() # get the next value from the queue
print(task)
queue.task_done() # when you are done tell the queue that this task is complete
# spin up some threads
threads = []
for _ in range(10):
t = threading.Thread(target=worker, args=(my_q,))
threads.append(t)
t.start()
my_q.join() # joining the queue means your code will wait here until the queue is empty
现在要回答有关共享状态的问题,您可以创建一个对象来保存您的变量。通过这种方式,您可以传递对象本身,而不是将变量的副本传递给每个线程(我相信这称为 Borg,但我在这方面可能有点错误)。执行此操作时,如果您计划对共享变量进行任何更改,则会导入它以确保它们是线程安全的。例如,如果两个线程试图同时递增一个数字,您可能会丢失该更改,因为一个线程会覆盖另一个线程。为了防止这种情况,我们使用 threading.Lock
对象。 (如果你不关心这个,就忽略下面所有的锁)。
还有其他方法可以做到这一点,但我发现这种方法易于理解且非常灵活:
import threading
# worker function
def worker(vars, lock):
with lock:
vars.counter += 1
print(f"{threading.current_thread().name}: counter = {vars.counter}")
# this holds your variables to be referenced by threads
class Vars(object):
counter = 0
vars = Vars()
lock = threading.Lock()
# spin up some threads
threads = []
for _ in range(10):
t = threading.Thread(target=worker, args=(vars, lock, ))
threads.append(t)
t.start()
for t in threads:
t.join()
我需要 运行 对输入列表进行并行处理,但在处理过程中使用上面代码中定义的所有变量和函数。但是这个过程本身可以并行化,因为它只依赖于一个变量,即列表的输入。 所以我有两种可能性,但我不知道如何实现这两种可能性:
1) 使用 class,并且有一个方法应该使用那个 class 的所有函数和属性进行并行化。即:运行 并行循环中的方法,但有机会读取对象的属性而不创建对象的副本。
2) 在 运行 并行化进程之前只需要一个大的 main 并定义全局变量。
例如:
from joblib import Parallel, delayed
def func(x,y,z):
#do something
a = func0(x,y) #whatever function
a = func1(a,z) #whatever function
return a
if name==“__main__””:
#a lot of stuff in which you create y and z
global y,z
result = Parallel(n_jobs=2)(delayed(func)(i,y,z)for i in range(10))
所以问题是,当我到达并行函数时,y 和 z 已经定义,它们只是查找数据,我的问题是如何将这些值传递给并行函数,而不 python 为每个作业创建一个副本?
如果您只需要将列表传递给一些并行进程,我会使用内置的线程模块。据我所知你的问题,这就是你所需要的,你可以将参数传递给线程。
这是一个基本的线程设置:
import threading
def func(x, y):
print(x, y) # random example
x, y = "foo", "bar"
threads = []
for _ in range(10): # create 10 threads
t = threading.Thread(target=func, args=(x, y,))
threads.append(t)
t.start()
for t in threads:
t.join() # waits for the thread to complete
但是,如果您需要以线程安全的方式跟踪该列表,您将需要使用队列:
import threading, queue
# build a thread-safe list
my_q = queue.Queue()
for i in range(1000):
my_q.put(i)
# here is your worker function
def worker(queue):
while not queue.empty():
task = queue.get() # get the next value from the queue
print(task)
queue.task_done() # when you are done tell the queue that this task is complete
# spin up some threads
threads = []
for _ in range(10):
t = threading.Thread(target=worker, args=(my_q,))
threads.append(t)
t.start()
my_q.join() # joining the queue means your code will wait here until the queue is empty
现在要回答有关共享状态的问题,您可以创建一个对象来保存您的变量。通过这种方式,您可以传递对象本身,而不是将变量的副本传递给每个线程(我相信这称为 Borg,但我在这方面可能有点错误)。执行此操作时,如果您计划对共享变量进行任何更改,则会导入它以确保它们是线程安全的。例如,如果两个线程试图同时递增一个数字,您可能会丢失该更改,因为一个线程会覆盖另一个线程。为了防止这种情况,我们使用 threading.Lock
对象。 (如果你不关心这个,就忽略下面所有的锁)。
还有其他方法可以做到这一点,但我发现这种方法易于理解且非常灵活:
import threading
# worker function
def worker(vars, lock):
with lock:
vars.counter += 1
print(f"{threading.current_thread().name}: counter = {vars.counter}")
# this holds your variables to be referenced by threads
class Vars(object):
counter = 0
vars = Vars()
lock = threading.Lock()
# spin up some threads
threads = []
for _ in range(10):
t = threading.Thread(target=worker, args=(vars, lock, ))
threads.append(t)
t.start()
for t in threads:
t.join()