减少巨大列表生成的执行时间
Reduce execution time on huge list generation
我是 Python 的新手,我正在尝试编写一些巨大的列表(其中包含随机字母)。实际上,我的机器上 2,000,000 行大约需要 75 - 80 秒。
import timeit
import random, string
global_tab = []
global_nb_loop = 2000000
print("Generate %d lines" % global_nb_loop)
global_tab = []
for x in range(global_nb_loop):
global_tab.append(("".join( [random.choice(string.ascii_letters) for i in range(15)] ), "".join( [random.choice(string.digits) for i in range(2)])))
print("%d lines generated" % len(global_tab))
linux time
命令的结果:
$ time python3 DEV/PyETL/generateList.py
Generate 2000000 lines
2000000 lines generated
real 1m16.844s
user 1m16.609s
sys 0m0.203s
当我监控系统资源时,只有 1 个核心达到 100%,而不是像在我也测试过的 Windows 机器上那样有 4 个核心,我感到很惊讶。
当然,我已经尝试应用一些线程,但我遇到了一个问题:它比 运行 在单核上花费更多的时间。也许线程不是解决方案或者我可能使用错误。
这是新代码:
import random, string
import threading
global_tab = []
global_nb_threads = 4
global_nb_loop = 2000000
threadLock = threading.Lock()
class generateList(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
global global_tab
self.tab = []
print("[%s] Generate %d lines" % (self.name, int(global_nb_loop/global_nb_threads)))
# divide desirated lines with number of threads
for x in range(int(global_nb_loop/global_nb_threads)):
self.tab.append(("".join( [random.choice(string.ascii_letters) for i in range(15)] ), "".join( [random.choice(string.digits) for i in range(2)])))
threadLock.acquire()
global_tab += self.tab
threadLock.release()
del self.tab
print("[%s] %d lines in list" % (self.name, len(global_tab)))
for i in range(global_nb_threads):
# Create threads
t = generateList("Thread-" + str(i))
# Start
t.start()
for i in range(global_nb_threads):
# Wait for threads end
t.join()
执行:
$ time python3 DEV/PyETL/generateListThreads.py
[Thread-0] Generate 500000 lines
[Thread-1] Generate 500000 lines
[Thread-2] Generate 500000 lines
[Thread-3] Generate 500000 lines
[Thread-3] 500000 lines in list
[Thread-0] 1000000 lines in list
[Thread-2] 1500000 lines in list
[Thread-1] 2000000 lines in list
real 1m40.858s
user 1m41.208s
sys 0m0.916s
32秒多1核100%,但监控显示8核同时负载20-40%
既然所有线程都在同时工作,产生的行数少,同步只是为了更新一个全局变量,执行时间不是应该比单核低吗?
我很确定您的锁不是必需的并且会减慢您的速度。 (编辑:实际上,我只是注意到锁是在 大部分工作完成后使用的,所以不是很相关。)
global_tab += self.tab
是(我认为)通过 Python GIL 的原子。 (实际上,this only claims list.extend()
, so use that instead. Here's another reference: Are lists thread safe?
或者,我会尝试使用较大的块大小 multiprocessing.imap_unordered
。缺点是结果是通过流发送的,但是您的随机字符串处理可能会掩盖这一点。
import multiprocessing
import random
import string
def randomword(x):
return ''.join(random.choice(string.ascii_letters) for i in range(15))
pool = multiprocessing.Pool(8)
results = pool.imap_unordered(randomword, range(100))
print([r for r in results])
对于 200 万个字符串(我将其更改为打印长度):
$ time python r.py
2000000
real 0m38.305s
user 1m31.717s
sys 0m25.853s
我也尝试稍微清理一下你的版本并得到:
$ time python rr.py
[Thread-0] Generate 250000 lines
[Thread-1] Generate 250000 lines
[Thread-2] Generate 250000 lines
[Thread-3] Generate 250000 lines
[Thread-4] Generate 250000 lines
[Thread-5] Generate 250000 lines
[Thread-6] Generate 250000 lines
[Thread-7] Generate 250000 lines
[Thread-4] 250000 lines in list
[Thread-1] 500000 lines in list
[Thread-7] 750000 lines in list
[Thread-0] 1000000 lines in list
[Thread-6] 1250000 lines in list
[Thread-2] 1500000 lines in list
[Thread-3] 1750000 lines in list
[Thread-5] 2000000 lines in list
real 0m22.113s
user 0m24.969s
sys 0m5.537s
一些重大变化:
- 在大范围内使用
xrange()
(嗯,python3 已经这样做了。)
- 移除线程锁
- 在全局上使用
extend()
。
(我的结果与仅附加到 global_tab
,顺便说一句,并省略了临时列表时大致相同。)
import random, string
import threading
global_tab = []
global_nb_threads = 8
global_nb_loop = 2000000
class generateList(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
global global_tab
self.tab = []
print("[%s] Generate %d lines" % (self.name, int(global_nb_loop/global_nb_threads)))
for x in range(int(global_nb_loop/global_nb_threads)):
self.tab.append(("".join( [random.choice(string.ascii_letters) for i in range(15)] ), "".join( [random.choice(string.digits) for i in range(2)])))
global_tab.extend(self.tab)
print("[%s] %d lines in list" % (self.name, len(global_tab)))
for i in range(global_nb_threads):
t = generateList("Thread-" + str(i))
t.start()
for i in range(global_nb_threads):
t.join()
...但是,单线程在 16 秒时仍然稍快。
如果我调整 multiprocessing
,我可以将它缩短到 6 秒:
size = 2000000
processes = 8
pool = multiprocessing.Pool(processes)
results = [r for r in pool.imap_unordered(randomword, range(size), chunksize=int(size/processes))]
print(len(results))
输出:
$ time python r.py
2000000
real 0m5.713s
user 0m35.594s
sys 0m0.546s
...所以我认为这是我的最终答案:使用 multiprocessing
.
CPython implementation detail: In CPython, due to the Global
Interpreter Lock, only one thread can execute Python code at once
(even though certain performance-oriented libraries might overcome
this limitation). If you want your application to make better use of
the computational resources of multi-core machines, you are advised to
use multiprocessing. However, threading is still an appropriate model
if you want to run multiple I/O-bound tasks simultaneously.
基本上这意味着 python 中的线程不会提高性能,除非线程大部分时间都在等待某事发生。多处理在 python 中运行良好,但由于进程不共享任何对象或全局状态,因此多处理的编程模型略有不同。以下是如何使用多处理的示例:
import multiprocessing
import random
import string
def randomData(i):
data = ("".join(random.sample(string.ascii_letters, 15)),
"".join(random.sample(string.digits, 2)))
return data
global_nb_loop = 2000000
pool = multiprocessing.Pool(8)
results = pool.imap(randomData, xrange(global_nb_loop))
global_tab = list(results)
print len(global_tab)
multiprocessing
模块有很多map
和apply
的版本,即imap
、map_async
等。浏览文档以找到最适合您的问题的文档。
由于您要处理大量数据,我建议您看一下 numpy。通常 numpy 比列表慢,但内存效率更高,并且非常适合许多矢量化操作。你总是可以走多处理路线,即使是 numpy。
这是一个比原始问题快 3 倍的版本(供参考,原始版本 运行 在我的机器上用了 30.3 秒)。
import numpy as np
def numpy_test(N=2000000):
global_nb_loop = N
global_tab = []
asc_list = list('abcdefghijklmnopqrstuvwxyz')
print("Generate %d lines" % global_nb_loop)
global_tab = [(u.tostring(),str(v)) for u,v in zip( np.random.choice(asc_list, (N, 15)), np.random.randint(10, 100, N) )]
print("%d lines generated" % len(global_tab))
In [306]: %timeit numpy_test()
Generate 2000000 lines
2000000 lines generated
Generate 2000000 lines
2000000 lines generated
Generate 2000000 lines
2000000 lines generated
Generate 2000000 lines
2000000 lines generated
1 loop, best of 3: 11.1 s per loop
我是 Python 的新手,我正在尝试编写一些巨大的列表(其中包含随机字母)。实际上,我的机器上 2,000,000 行大约需要 75 - 80 秒。
import timeit
import random, string
global_tab = []
global_nb_loop = 2000000
print("Generate %d lines" % global_nb_loop)
global_tab = []
for x in range(global_nb_loop):
global_tab.append(("".join( [random.choice(string.ascii_letters) for i in range(15)] ), "".join( [random.choice(string.digits) for i in range(2)])))
print("%d lines generated" % len(global_tab))
linux time
命令的结果:
$ time python3 DEV/PyETL/generateList.py
Generate 2000000 lines
2000000 lines generated
real 1m16.844s
user 1m16.609s
sys 0m0.203s
当我监控系统资源时,只有 1 个核心达到 100%,而不是像在我也测试过的 Windows 机器上那样有 4 个核心,我感到很惊讶。
当然,我已经尝试应用一些线程,但我遇到了一个问题:它比 运行 在单核上花费更多的时间。也许线程不是解决方案或者我可能使用错误。
这是新代码:
import random, string
import threading
global_tab = []
global_nb_threads = 4
global_nb_loop = 2000000
threadLock = threading.Lock()
class generateList(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
global global_tab
self.tab = []
print("[%s] Generate %d lines" % (self.name, int(global_nb_loop/global_nb_threads)))
# divide desirated lines with number of threads
for x in range(int(global_nb_loop/global_nb_threads)):
self.tab.append(("".join( [random.choice(string.ascii_letters) for i in range(15)] ), "".join( [random.choice(string.digits) for i in range(2)])))
threadLock.acquire()
global_tab += self.tab
threadLock.release()
del self.tab
print("[%s] %d lines in list" % (self.name, len(global_tab)))
for i in range(global_nb_threads):
# Create threads
t = generateList("Thread-" + str(i))
# Start
t.start()
for i in range(global_nb_threads):
# Wait for threads end
t.join()
执行:
$ time python3 DEV/PyETL/generateListThreads.py
[Thread-0] Generate 500000 lines
[Thread-1] Generate 500000 lines
[Thread-2] Generate 500000 lines
[Thread-3] Generate 500000 lines
[Thread-3] 500000 lines in list
[Thread-0] 1000000 lines in list
[Thread-2] 1500000 lines in list
[Thread-1] 2000000 lines in list
real 1m40.858s
user 1m41.208s
sys 0m0.916s
32秒多1核100%,但监控显示8核同时负载20-40%
既然所有线程都在同时工作,产生的行数少,同步只是为了更新一个全局变量,执行时间不是应该比单核低吗?
我很确定您的锁不是必需的并且会减慢您的速度。 (编辑:实际上,我只是注意到锁是在 大部分工作完成后使用的,所以不是很相关。)
global_tab += self.tab
是(我认为)通过 Python GIL 的原子。 (实际上,this only claims list.extend()
, so use that instead. Here's another reference: Are lists thread safe?
或者,我会尝试使用较大的块大小 multiprocessing.imap_unordered
。缺点是结果是通过流发送的,但是您的随机字符串处理可能会掩盖这一点。
import multiprocessing
import random
import string
def randomword(x):
return ''.join(random.choice(string.ascii_letters) for i in range(15))
pool = multiprocessing.Pool(8)
results = pool.imap_unordered(randomword, range(100))
print([r for r in results])
对于 200 万个字符串(我将其更改为打印长度):
$ time python r.py
2000000
real 0m38.305s
user 1m31.717s
sys 0m25.853s
我也尝试稍微清理一下你的版本并得到:
$ time python rr.py
[Thread-0] Generate 250000 lines
[Thread-1] Generate 250000 lines
[Thread-2] Generate 250000 lines
[Thread-3] Generate 250000 lines
[Thread-4] Generate 250000 lines
[Thread-5] Generate 250000 lines
[Thread-6] Generate 250000 lines
[Thread-7] Generate 250000 lines
[Thread-4] 250000 lines in list
[Thread-1] 500000 lines in list
[Thread-7] 750000 lines in list
[Thread-0] 1000000 lines in list
[Thread-6] 1250000 lines in list
[Thread-2] 1500000 lines in list
[Thread-3] 1750000 lines in list
[Thread-5] 2000000 lines in list
real 0m22.113s
user 0m24.969s
sys 0m5.537s
一些重大变化:
- 在大范围内使用
xrange()
(嗯,python3 已经这样做了。) - 移除线程锁
- 在全局上使用
extend()
。
(我的结果与仅附加到 global_tab
,顺便说一句,并省略了临时列表时大致相同。)
import random, string
import threading
global_tab = []
global_nb_threads = 8
global_nb_loop = 2000000
class generateList(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
global global_tab
self.tab = []
print("[%s] Generate %d lines" % (self.name, int(global_nb_loop/global_nb_threads)))
for x in range(int(global_nb_loop/global_nb_threads)):
self.tab.append(("".join( [random.choice(string.ascii_letters) for i in range(15)] ), "".join( [random.choice(string.digits) for i in range(2)])))
global_tab.extend(self.tab)
print("[%s] %d lines in list" % (self.name, len(global_tab)))
for i in range(global_nb_threads):
t = generateList("Thread-" + str(i))
t.start()
for i in range(global_nb_threads):
t.join()
...但是,单线程在 16 秒时仍然稍快。
如果我调整 multiprocessing
,我可以将它缩短到 6 秒:
size = 2000000
processes = 8
pool = multiprocessing.Pool(processes)
results = [r for r in pool.imap_unordered(randomword, range(size), chunksize=int(size/processes))]
print(len(results))
输出:
$ time python r.py
2000000
real 0m5.713s
user 0m35.594s
sys 0m0.546s
...所以我认为这是我的最终答案:使用 multiprocessing
.
CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
基本上这意味着 python 中的线程不会提高性能,除非线程大部分时间都在等待某事发生。多处理在 python 中运行良好,但由于进程不共享任何对象或全局状态,因此多处理的编程模型略有不同。以下是如何使用多处理的示例:
import multiprocessing
import random
import string
def randomData(i):
data = ("".join(random.sample(string.ascii_letters, 15)),
"".join(random.sample(string.digits, 2)))
return data
global_nb_loop = 2000000
pool = multiprocessing.Pool(8)
results = pool.imap(randomData, xrange(global_nb_loop))
global_tab = list(results)
print len(global_tab)
multiprocessing
模块有很多map
和apply
的版本,即imap
、map_async
等。浏览文档以找到最适合您的问题的文档。
由于您要处理大量数据,我建议您看一下 numpy。通常 numpy 比列表慢,但内存效率更高,并且非常适合许多矢量化操作。你总是可以走多处理路线,即使是 numpy。
这是一个比原始问题快 3 倍的版本(供参考,原始版本 运行 在我的机器上用了 30.3 秒)。
import numpy as np
def numpy_test(N=2000000):
global_nb_loop = N
global_tab = []
asc_list = list('abcdefghijklmnopqrstuvwxyz')
print("Generate %d lines" % global_nb_loop)
global_tab = [(u.tostring(),str(v)) for u,v in zip( np.random.choice(asc_list, (N, 15)), np.random.randint(10, 100, N) )]
print("%d lines generated" % len(global_tab))
In [306]: %timeit numpy_test()
Generate 2000000 lines
2000000 lines generated
Generate 2000000 lines
2000000 lines generated
Generate 2000000 lines
2000000 lines generated
Generate 2000000 lines
2000000 lines generated
1 loop, best of 3: 11.1 s per loop