并行 运行 一段 python 代码的简单方法?
A simple way to run a piece of python code in parallel?
我有这个非常简单的 python 代码:
Test = 1;
def para():
while(True):
if Test > 10:
print("Test is bigger than ten");
time.sleep(1);
para(); # I want this to start in parallel, so that the code below keeps executing without waiting for this function to finish
while(True):
Test = random.randint(1,42);
time.sleep(1);
if Test == 42:
break;
...#stop the parallel execution of the para() here (kill it)
..some other code here
基本上,我想 运行 函数 para() 与其他代码并行,这样它下面的代码就不必等待 para() 结束。
但是,我希望能够在并行 运行ning 时访问 para() 内部的 Test 变量的当前值(如上面的代码示例所示)。后来,当我决定并行完成 para() 运行ning 时,我想知道如何从主线程以及并行 [=22] 中杀死它=]ning para() 本身(自我终止)。
我已经阅读了一些关于线程的教程,但几乎每个教程都采用不同的方法,而且我在理解其中的一些方面遇到了困难,所以我想知道,运行 a 最简单的方法是什么并行的一段代码。
谢谢。
好的,首先,这是对您问题的逐字回答,并以最简单的方式回答。之后,我们将通过两个示例更全面地回答,这两个示例展示了执行此操作以及在主代码和并行代码之间共享数据访问的两种方法。
import random
from threading import Thread
import time
Test = 1;
stop = False
def para():
while not stop:
if Test > 10:
print("Test is bigger than ten");
time.sleep(1);
# I want this to start in parallel, so that the code below keeps executing without waiting for this function to finish
thread = Thread(target=para)
thread.start()
while(True):
Test = random.randint(1,42);
time.sleep(1);
if Test == 42:
break;
#stop the parallel execution of the para() here (kill it)
stop = True
thread.join()
#..some other code here
print( 'we have stopped' )
现在,更完整的答案:
在下文中,我们展示了两个代码示例(在下面列出),它们演示了 (a) 使用线程接口的并行执行,以及 (b) 使用多处理接口。您选择使用哪一个取决于您要做什么。当第二个线程的目的是等待 I/O 时,线程可能是一个不错的选择,而当第二个线程用于进行 cpu 密集计算时,多进程可能是一个不错的选择。
在您的示例中,主代码更改了一个变量,而并行代码仅检查了该变量。如果你想改变两者的变量,情况就不同了,例如重置共享计数器。所以,我们也会向您展示如何做到这一点。
在以下示例代码中:
变量“计数器”和“运行”和“” lock”在主程序和并行执行的代码之间共享。
函数myfunc()是并行执行的。它循环更新 counter 和休眠,直到 运行 被主程序设置为 false。
主程序循环打印 counter 的值,直到它达到 5,此时它重置计数器。然后,在它再次达到 5 后,它将 运行 设置为 false,最后,它在退出自身之前等待线程或进程退出。
您可能会注意到 counter 在对 lock.acquire() 和 [ 的调用中递增=81=]() 在第一个示例中,或者在第二个示例中使用 lock。
递增计数器包括三个步骤,(1) 读取当前值,(2) 将其加一,然后 (3) 将结果存储回计数器。当一个线程试图在发生这种情况的同时设置计数器时,问题就来了。
我们通过让主程序和并行代码在更改变量之前获取 lock 来解决这个问题,然后 release当他们完成时。如果锁已经被占用,程序或并行代码会一直等待直到它被释放。这 同步 他们更改共享数据(即计数器)的访问权限。 (此外,另一种同步请参见 semaphore)。
有了这个介绍,这里是第一个使用线程的例子:
# Parallel code with shared variables, using threads
from threading import Lock, Thread
from time import sleep
# Variables to be shared across threads
counter = 0
run = True
lock = Lock()
# Function to be executed in parallel
def myfunc():
# Declare shared variables
global run
global counter
global lock
# Processing to be done until told to exit
while run:
sleep( 1 )
# Increment the counter
lock.acquire()
counter = counter + 1
lock.release()
# Set the counter to show that we exited
lock.acquire()
counter = -1
lock.release()
print( 'thread exit' )
# ----------------------------
# Launch the parallel function as a thread
thread = Thread(target=myfunc)
thread.start()
# Read and print the counter
while counter < 5:
print( counter )
sleep( 1 )
# Change the counter
lock.acquire()
counter = 0
lock.release()
# Read and print the counter
while counter < 5:
print( counter )
sleep( 1 )
# Tell the thread to exit and wait for it to exit
run = False
thread.join()
# Confirm that the thread set the counter on exit
print( counter )
这里是第二个例子,它使用了多处理。请注意,访问共享变量涉及一些额外的步骤。
from time import sleep
from multiprocessing import Process, Value, Lock
def myfunc(counter, lock, run):
while run.value:
sleep(1)
with lock:
counter.value += 1
print( "thread %d"%counter.value )
with lock:
counter.value = -1
print( "thread exit %d"%counter.value )
# =======================
counter = Value('i', 0)
run = Value('b', True)
lock = Lock()
p = Process(target=myfunc, args=(counter, lock, run))
p.start()
while counter.value < 5:
print( "main %d"%counter.value )
sleep(1)
with lock:
counter.value = 0
while counter.value < 5:
print( "main %d"%counter.value )
sleep(1)
run.value = False
p.join()
print( "main exit %d"%counter.value)
与其手动启动线程,不如使用 multiprocessing.pool。多处理部分需要在您使用 map 调用的函数中。然后您可以使用 pool.imap.
而不是地图
import multiprocessing
import time
def func(x):
time.sleep(x)
return x + 2
if __name__ == "__main__":
p = multiprocessing.Pool()
start = time.time()
for x in p.imap(func, [1,5,3]):
print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))
另请查看:
multiprocessing.Pool: What's the difference between map_async and imap?
另外值得一试的是 functools.partials,它可用于传递多个变量(除了列表之外)。
另一个技巧:有时你并不真正需要多处理(如在处理器的多个内核中),而只是多个线程来同时查询具有多个连接的数据库。在那种情况下,只需执行 from multiprocessing.dummy import Pool 并可以避免 python 产生一个单独的进程(这会使您失去对所有未传递给函数的名称空间的访问权限),但保留所有池的好处,仅在单个 cpu 核心中。这就是关于 python 多处理(使用多核)和多线程(仅使用一个进程并保持全局解释器锁不变)的全部信息。
另一个小建议:始终尝试在没有任何池的情况下首先使用 map。一旦确定一切正常,然后在下一步中切换到 pool.imap。
我有这个非常简单的 python 代码:
Test = 1;
def para():
while(True):
if Test > 10:
print("Test is bigger than ten");
time.sleep(1);
para(); # I want this to start in parallel, so that the code below keeps executing without waiting for this function to finish
while(True):
Test = random.randint(1,42);
time.sleep(1);
if Test == 42:
break;
...#stop the parallel execution of the para() here (kill it)
..some other code here
基本上,我想 运行 函数 para() 与其他代码并行,这样它下面的代码就不必等待 para() 结束。 但是,我希望能够在并行 运行ning 时访问 para() 内部的 Test 变量的当前值(如上面的代码示例所示)。后来,当我决定并行完成 para() 运行ning 时,我想知道如何从主线程以及并行 [=22] 中杀死它=]ning para() 本身(自我终止)。
我已经阅读了一些关于线程的教程,但几乎每个教程都采用不同的方法,而且我在理解其中的一些方面遇到了困难,所以我想知道,运行 a 最简单的方法是什么并行的一段代码。
谢谢。
好的,首先,这是对您问题的逐字回答,并以最简单的方式回答。之后,我们将通过两个示例更全面地回答,这两个示例展示了执行此操作以及在主代码和并行代码之间共享数据访问的两种方法。
import random
from threading import Thread
import time
Test = 1;
stop = False
def para():
while not stop:
if Test > 10:
print("Test is bigger than ten");
time.sleep(1);
# I want this to start in parallel, so that the code below keeps executing without waiting for this function to finish
thread = Thread(target=para)
thread.start()
while(True):
Test = random.randint(1,42);
time.sleep(1);
if Test == 42:
break;
#stop the parallel execution of the para() here (kill it)
stop = True
thread.join()
#..some other code here
print( 'we have stopped' )
现在,更完整的答案:
在下文中,我们展示了两个代码示例(在下面列出),它们演示了 (a) 使用线程接口的并行执行,以及 (b) 使用多处理接口。您选择使用哪一个取决于您要做什么。当第二个线程的目的是等待 I/O 时,线程可能是一个不错的选择,而当第二个线程用于进行 cpu 密集计算时,多进程可能是一个不错的选择。
在您的示例中,主代码更改了一个变量,而并行代码仅检查了该变量。如果你想改变两者的变量,情况就不同了,例如重置共享计数器。所以,我们也会向您展示如何做到这一点。
在以下示例代码中:
变量“计数器”和“运行”和“” lock”在主程序和并行执行的代码之间共享。
函数myfunc()是并行执行的。它循环更新 counter 和休眠,直到 运行 被主程序设置为 false。
主程序循环打印 counter 的值,直到它达到 5,此时它重置计数器。然后,在它再次达到 5 后,它将 运行 设置为 false,最后,它在退出自身之前等待线程或进程退出。
您可能会注意到 counter 在对 lock.acquire() 和 [ 的调用中递增=81=]() 在第一个示例中,或者在第二个示例中使用 lock。
递增计数器包括三个步骤,(1) 读取当前值,(2) 将其加一,然后 (3) 将结果存储回计数器。当一个线程试图在发生这种情况的同时设置计数器时,问题就来了。
我们通过让主程序和并行代码在更改变量之前获取 lock 来解决这个问题,然后 release当他们完成时。如果锁已经被占用,程序或并行代码会一直等待直到它被释放。这 同步 他们更改共享数据(即计数器)的访问权限。 (此外,另一种同步请参见 semaphore)。
有了这个介绍,这里是第一个使用线程的例子:
# Parallel code with shared variables, using threads
from threading import Lock, Thread
from time import sleep
# Variables to be shared across threads
counter = 0
run = True
lock = Lock()
# Function to be executed in parallel
def myfunc():
# Declare shared variables
global run
global counter
global lock
# Processing to be done until told to exit
while run:
sleep( 1 )
# Increment the counter
lock.acquire()
counter = counter + 1
lock.release()
# Set the counter to show that we exited
lock.acquire()
counter = -1
lock.release()
print( 'thread exit' )
# ----------------------------
# Launch the parallel function as a thread
thread = Thread(target=myfunc)
thread.start()
# Read and print the counter
while counter < 5:
print( counter )
sleep( 1 )
# Change the counter
lock.acquire()
counter = 0
lock.release()
# Read and print the counter
while counter < 5:
print( counter )
sleep( 1 )
# Tell the thread to exit and wait for it to exit
run = False
thread.join()
# Confirm that the thread set the counter on exit
print( counter )
这里是第二个例子,它使用了多处理。请注意,访问共享变量涉及一些额外的步骤。
from time import sleep
from multiprocessing import Process, Value, Lock
def myfunc(counter, lock, run):
while run.value:
sleep(1)
with lock:
counter.value += 1
print( "thread %d"%counter.value )
with lock:
counter.value = -1
print( "thread exit %d"%counter.value )
# =======================
counter = Value('i', 0)
run = Value('b', True)
lock = Lock()
p = Process(target=myfunc, args=(counter, lock, run))
p.start()
while counter.value < 5:
print( "main %d"%counter.value )
sleep(1)
with lock:
counter.value = 0
while counter.value < 5:
print( "main %d"%counter.value )
sleep(1)
run.value = False
p.join()
print( "main exit %d"%counter.value)
与其手动启动线程,不如使用 multiprocessing.pool。多处理部分需要在您使用 map 调用的函数中。然后您可以使用 pool.imap.
而不是地图import multiprocessing
import time
def func(x):
time.sleep(x)
return x + 2
if __name__ == "__main__":
p = multiprocessing.Pool()
start = time.time()
for x in p.imap(func, [1,5,3]):
print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))
另请查看: multiprocessing.Pool: What's the difference between map_async and imap?
另外值得一试的是 functools.partials,它可用于传递多个变量(除了列表之外)。
另一个技巧:有时你并不真正需要多处理(如在处理器的多个内核中),而只是多个线程来同时查询具有多个连接的数据库。在那种情况下,只需执行 from multiprocessing.dummy import Pool 并可以避免 python 产生一个单独的进程(这会使您失去对所有未传递给函数的名称空间的访问权限),但保留所有池的好处,仅在单个 cpu 核心中。这就是关于 python 多处理(使用多核)和多线程(仅使用一个进程并保持全局解释器锁不变)的全部信息。
另一个小建议:始终尝试在没有任何池的情况下首先使用 map。一旦确定一切正常,然后在下一步中切换到 pool.imap。