为什么我的多线程没有帮助加速?
Why isn't my multithreading helping speedup?
我正在将文本行读入列表 'rows',并尝试使用多线程来加速。但是,它根本没有加速。我正在观察 Mac 上的 cpu 用法,并注意到 cpu 是多线程的 145%,但没有加速。
from concurrent.futures import ThreadPoolExecutor
te = TimeExtractor()
def time_test(text):
result = te.compute_time(text)
# print(result)
if __name__ == "__main__":
start = time.time()
rows = []
with open('data/data.csv', 'r', encoding='utf') as f:
csvreader = csv.DictReader(f, delimiter='\t', quoting=csv.QUOTE_ALL)
for row in csvreader:
rows.append(row['text'])
with ThreadPoolExecutor(4) as executor:
results = executor.map(time_test, rows)
end = time.time()
print(end-start)
print('Done!!!')
回答这个问题有几个可能的角度。 CPython 有一种叫做 GIL(全局解释器锁)的东西,它可以防止 python 代码并行 运行ning。这意味着与 cpu 计算相比,线程确实更适合从 i/o 繁重的代码中获益。这是因为计算可以阻塞 i/o 并为其他线程释放 GIL 的点要少得多。结果是 cpu 密集型应用程序将产生线程上下文切换的成本,而不会真正提高并行性能。
如果您确实有一个 cpu 绑定函数,则可以通过使用多处理而不是多线程对您的工作进行批处理来实现您的增加。单独的进程将能够 运行 并行,并且会产生更多的成本来序列化输入值。所以计算需要超过小的序列化成本。
除此之外,我们还需要确切了解您的处理函数在做什么。你甚至可能有一个错误,你认为你正在拆分你的工作,但实际上并没有。
使用多线程的代码的简化版本是:
import time
import concurrent.futures
from multiprocessing import cpu_count
num_cpu = cpu_count()
print("CPU Count: ", num_cpu) # cpu_count doesnt really matter
e = concurrent.futures.ThreadPoolExecutor(num_cpu)
def cpu_intensive_task(i):
print(i, ' : start task')
count = 10000000 * (i+1)
while count > 0:
count -= 1
print(i, ' : end task')
return i
start = time.time()
for i in e.map(cpu_intensive_task, range(10)):
print(i, ' : in loop')
end = time.time()
print('LOOP DONE')
print('Total Time taken: ', (end-start))
输出:
CPU Count: 8
0 : start task
1 : start task
2 : start task
3 : start task
4 : start task
5 : start task
7 : start task
6 : start task
0 : end task
8 : start task
0 : in loop
1 : end task
9 : start task
1 : in loop
2 : end task
2 : in loop
3 : end task
3 : in loop
4 : end task
4 : in loop
5 : end task
5 : in loop
6 : end task
6 : in loop
7 : end task
7 : in loop
8 : end task
8 : in loop
9 : end task
9 : in loop
LOOP DONE
Total Time taken: 30.59025502204895
注意:所有线程执行完毕后才退出循环
没有多线程的相同代码:
import time
def cpu_intensive_task(i):
print(i, ' : start task')
count = 10000000 * (i+1)
while count > 0:
count -= 1
print(i, ' : end task')
return i
start = time.time()
for i in range(10):
cpu_intensive_task(i)
print(i, ' : in loop')
end = time.time()
print('LOOP DONE')
print('Time taken: ', (end-start))
输出:
0 : start task
0 : end task
0 : in loop
1 : start task
1 : end task
1 : in loop
2 : start task
2 : end task
2 : in loop
3 : start task
3 : end task
3 : in loop
4 : start task
4 : end task
4 : in loop
5 : start task
5 : end task
5 : in loop
6 : start task
6 : end task
6 : in loop
7 : start task
7 : end task
7 : in loop
8 : start task
8 : end task
8 : in loop
9 : start task
9 : end task
9 : in loop
LOOP DONE
Time taken: 30.072215795516968
注意:所用时间与多线程方法几乎相同(略少)。
多线程无助于此类工作负载
使用多处理的相同代码:
import time
import sys
from multiprocessing import Process, Lock, Value, cpu_count
def cpu_intensive_task(i):
print(i, ' : start task')
count = 10000000 * (i+1)
while count > 0:
count -= 1
print(i, ' : end task')
return i
if __name__ == '__main__':
print("CPU Count: ", cpu_count())
start = time.time()
processes = []
for i in range(10):
p = Process(target=cpu_intensive_task, args=(i,))
processes.append(p)
p.start()
print(i, ' : in loop')
print('LOOP END')
for p in processes:
p.join()
end = time.time()
print('Total Time Taken: ', (end - start))
输出:
CPU Count: 8
0 : in loop
1 : in loop
2 : in loop
3 : in loop
4 : in loop
5 : in loop
6 : in loop
7 : in loop
8 : in loop
9 : in loop
LOOP END
0 : start task
1 : start task
2 : start task
3 : start task
5 : start task
4 : start task
8 : start task
7 : start task
6 : start task
9 : start task
0 : end task
1 : end task
2 : end task
3 : end task
4 : end task
5 : end task
6 : end task
7 : end task
8 : end task
9 : end task
Total Time Taken: 10.335741996765137
注意:多处理仅花费多线程方法所用时间的 1/3
The mechanism used by the CPython interpreter to assure that only one
thread executes Python bytecode at a time. This simplifies the CPython
implementation by making the object model (including critical built-in
types such as dict) implicitly safe against concurrent access.
However, some extension modules, either standard or third-party, are
designed so as to release the GIL when doing computationally-intensive
tasks such as compression or hashing. Also, the GIL is always released
when doing I/O.
multiprocessing is a package that supports spawning processes using an
API similar to the threading module. The multiprocessing package
offers both local and remote concurrency, effectively side-stepping
the Global Interpreter Lock by using subprocesses instead of threads.
Due to this, the multiprocessing module allows the programmer to fully
leverage multiple processors on a given machine.
因此,只有多处理允许使用多个处理器,从而实现真正的并发。
我正在将文本行读入列表 'rows',并尝试使用多线程来加速。但是,它根本没有加速。我正在观察 Mac 上的 cpu 用法,并注意到 cpu 是多线程的 145%,但没有加速。
from concurrent.futures import ThreadPoolExecutor
te = TimeExtractor()
def time_test(text):
result = te.compute_time(text)
# print(result)
if __name__ == "__main__":
start = time.time()
rows = []
with open('data/data.csv', 'r', encoding='utf') as f:
csvreader = csv.DictReader(f, delimiter='\t', quoting=csv.QUOTE_ALL)
for row in csvreader:
rows.append(row['text'])
with ThreadPoolExecutor(4) as executor:
results = executor.map(time_test, rows)
end = time.time()
print(end-start)
print('Done!!!')
回答这个问题有几个可能的角度。 CPython 有一种叫做 GIL(全局解释器锁)的东西,它可以防止 python 代码并行 运行ning。这意味着与 cpu 计算相比,线程确实更适合从 i/o 繁重的代码中获益。这是因为计算可以阻塞 i/o 并为其他线程释放 GIL 的点要少得多。结果是 cpu 密集型应用程序将产生线程上下文切换的成本,而不会真正提高并行性能。
如果您确实有一个 cpu 绑定函数,则可以通过使用多处理而不是多线程对您的工作进行批处理来实现您的增加。单独的进程将能够 运行 并行,并且会产生更多的成本来序列化输入值。所以计算需要超过小的序列化成本。
除此之外,我们还需要确切了解您的处理函数在做什么。你甚至可能有一个错误,你认为你正在拆分你的工作,但实际上并没有。
使用多线程的代码的简化版本是:
import time
import concurrent.futures
from multiprocessing import cpu_count
num_cpu = cpu_count()
print("CPU Count: ", num_cpu) # cpu_count doesnt really matter
e = concurrent.futures.ThreadPoolExecutor(num_cpu)
def cpu_intensive_task(i):
print(i, ' : start task')
count = 10000000 * (i+1)
while count > 0:
count -= 1
print(i, ' : end task')
return i
start = time.time()
for i in e.map(cpu_intensive_task, range(10)):
print(i, ' : in loop')
end = time.time()
print('LOOP DONE')
print('Total Time taken: ', (end-start))
输出:
CPU Count: 8
0 : start task
1 : start task
2 : start task
3 : start task
4 : start task
5 : start task
7 : start task
6 : start task
0 : end task
8 : start task
0 : in loop
1 : end task
9 : start task
1 : in loop
2 : end task
2 : in loop
3 : end task
3 : in loop
4 : end task
4 : in loop
5 : end task
5 : in loop
6 : end task
6 : in loop
7 : end task
7 : in loop
8 : end task
8 : in loop
9 : end task
9 : in loop
LOOP DONE
Total Time taken: 30.59025502204895
注意:所有线程执行完毕后才退出循环
没有多线程的相同代码:
import time
def cpu_intensive_task(i):
print(i, ' : start task')
count = 10000000 * (i+1)
while count > 0:
count -= 1
print(i, ' : end task')
return i
start = time.time()
for i in range(10):
cpu_intensive_task(i)
print(i, ' : in loop')
end = time.time()
print('LOOP DONE')
print('Time taken: ', (end-start))
输出:
0 : start task
0 : end task
0 : in loop
1 : start task
1 : end task
1 : in loop
2 : start task
2 : end task
2 : in loop
3 : start task
3 : end task
3 : in loop
4 : start task
4 : end task
4 : in loop
5 : start task
5 : end task
5 : in loop
6 : start task
6 : end task
6 : in loop
7 : start task
7 : end task
7 : in loop
8 : start task
8 : end task
8 : in loop
9 : start task
9 : end task
9 : in loop
LOOP DONE
Time taken: 30.072215795516968
注意:所用时间与多线程方法几乎相同(略少)。 多线程无助于此类工作负载
使用多处理的相同代码:
import time
import sys
from multiprocessing import Process, Lock, Value, cpu_count
def cpu_intensive_task(i):
print(i, ' : start task')
count = 10000000 * (i+1)
while count > 0:
count -= 1
print(i, ' : end task')
return i
if __name__ == '__main__':
print("CPU Count: ", cpu_count())
start = time.time()
processes = []
for i in range(10):
p = Process(target=cpu_intensive_task, args=(i,))
processes.append(p)
p.start()
print(i, ' : in loop')
print('LOOP END')
for p in processes:
p.join()
end = time.time()
print('Total Time Taken: ', (end - start))
输出:
CPU Count: 8
0 : in loop
1 : in loop
2 : in loop
3 : in loop
4 : in loop
5 : in loop
6 : in loop
7 : in loop
8 : in loop
9 : in loop
LOOP END
0 : start task
1 : start task
2 : start task
3 : start task
5 : start task
4 : start task
8 : start task
7 : start task
6 : start task
9 : start task
0 : end task
1 : end task
2 : end task
3 : end task
4 : end task
5 : end task
6 : end task
7 : end task
8 : end task
9 : end task
Total Time Taken: 10.335741996765137
注意:多处理仅花费多线程方法所用时间的 1/3
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. However, some extension modules, either standard or third-party, are designed so as to release the GIL when doing computationally-intensive tasks such as compression or hashing. Also, the GIL is always released when doing I/O.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine.
因此,只有多处理允许使用多个处理器,从而实现真正的并发。