具有多处理的特定标准输出行顺序
Specific stdout line order with multi-processing
我有以下问题:
g
个数据生成器,每个生成器都是独立的 Process
,产生 n
我需要以循环方式(超过 g
)写出的值.因此对于生成器 A
、B
、C
,输出必须按顺序排列,如:
<value 1 of A>
<value 1 of B>
<value 1 of C>
<value 2 of A>
<value 2 of B>
<value 2 of C>
<value 3 of A>
...
在功能上它可以工作,但仍然比单线程慢得多。因为我需要生成大量值,所以我想并行化生成(可能还有输出,这是我目前无法做到的)。
代码(已更新为使用 mp.Pipe
并包含单独的数据生成器):
import random
import argparse
import multiprocessing as mp
import sys
class DataGenerator:
_id = 0
def __init__(self, id: int, **kwargs):
self._id = id
def generate(self):
return '%03d:%4x' % (self._id, random.getrandbits(16))
def produce(generator, pipe, num: int):
p_read, p_write = pipe
i = 0
while i < num:
i += 1
p_write.send(generator.generate())
def consume(pipes: list, num: int):
i = 0
p_count = len(pipes)
while i < num:
# enforce round-robin printing...
p_idx = i % p_count
p_read, p_write = pipes[p_idx]
i += 1
sys.stdout.write(p_read.recv() + '\n')
def multi_processed(num: int, processes: int):
per_process = int(num / processes)
if num % processes != 0:
per_process += 1
q = list()
g = list()
for i in range(processes):
q.append(mp.Pipe(False))
g.append(DataGenerator(i + 1))
procs = list()
for i in range(processes):
p = mp.Process(target=produce, args=(g[i], q[i], per_process))
p.start()
procs.append(p)
consume(q, num)
for p in procs:
p.join()
def single_threaded(num: int, processes: int):
g = list()
for i in range(processes):
g.append(DataGenerator(i + 1))
for i in range(num):
j = i % processes
print(g[j].generate())
def main():
parser = argparse.ArgumentParser(description='Threading test')
parser.add_argument(
'--count', '-c', dest='count', type=int, default=1000000,
help='How many total iterations (default: 1000000)')
parser.add_argument(
'--threads', '-t', dest='threads', type=int, default=1,
help='how many threads to use (default: 1 - single-threaded)')
args = parser.parse_args()
if args.threads > 1:
multi_processed(args.count, abs(args.threads))
else:
single_threaded(args.count, mp.cpu_count())
if __name__ == '__main__':
main()
在执行时,它占用了我所有的 4 CPU 个内核,但在性能方面它比顺序执行慢:
单线程 10,000,000 个总值的执行时间:
$ time python3 threading_output.py --threads 1 --count 10000000 | wc -l
10000000
real 0m16.557s
user 0m16.443s
sys 0m0.437s
...multiprocessing
实现也是如此:
$ time python3 threading_output.py --threads 4 --count 10000000 | wc -l
10000000
real 1m6.446s
user 3m10.073s
sys 0m54.274s
不使用 mp.Queue
并直接在 produce
循环内打印生成的值给我大约 9.6 秒,但当然,输出行没有确定的顺序。
我怎样才能加快速度?
更新#1
使用 mp.Array
不是共享缓冲区的选项,因为我需要对字符串数组使用 ctype c_wchar_p
,根据docs.
更新 #2
将 mp.Queue(1000)
替换为 mp.Pipe(False)
,这使 1000 万个值的时间缩短到约 45 秒。生产者进程现在对 CPU 的占用少了很多,消费者是明显的瓶颈:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
5943 ancoron 20 0 28420 15168 8348 R 99.9 0.0 0:12.23 `- python3 threading_output.py --threads 4 --count 10000000
5947 ancoron 20 0 28284 10336 3536 R 29.9 0.0 0:03.69 `- python3 threading_output.py --threads 4 --count 10000000
5948 ancoron 20 0 28284 10336 3536 R 30.8 0.0 0:03.71 `- python3 threading_output.py --threads 4 --count 10000000
5949 ancoron 20 0 28284 10336 3536 R 30.8 0.0 0:03.71 `- python3 threading_output.py --threads 4 --count 10000000
5950 ancoron 20 0 28284 10340 3536 R 29.0 0.0 0:03.58 `- python3 threading_output.py --threads 4 --count 10000000
更新#3
我尝试使用 cinda,使用简单的 BytesQueue
,将时间缩短到约 23 秒。仍然比单线程慢。
好的,我做了一些测试,现在我感到困惑。我做了多线程和异步解决方案,但都不是特别好。我还复制并粘贴了你的代码,它总是挂起,即使它是 'done'。
请注意,在我的代码中,我使用了它作为 TID 给出的数字,而不是 4 个随机的十六进制数字,因为我想确保它正在做你想要的。另一种方式更难分辨,可以很容易地改成十六进制。
单线程:
import random
import sys
def generate():
return random.randrange(-10, 10)
if len(sys.argv) < 2:
print("NEED ARGS")
exit(0)
num = int(sys.argv[1])
for x in range(num):
for _ in range(x):
print("[{}]: {}".format(x, generate()))
多线程:
from concurrent.futures import TimeoutError
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys
def generate():
return random.randrange(-10, 10)
def produce(num):
#tid = '%04x' % random.getrandbits(16)
tid = num
i = 0
while i < num:
print('[%s] %3d' % (tid, generate()))
i += 1
if __name__ == "__main__":
if len(sys.argv) < 2:
print("NEED ARGS")
exit(0)
num = int(sys.argv[1])
with ThreadPool(max_workers=mp.cpu_count()) as pool:
future = pool.map(produce, range(num), timeout=3)
iterator = future.result()
while True:
try:
result = next(iterator)
except StopIteration:
break
except TimeoutError as error:
print(error)
break
except ProcessExpired as error:
print(error)
break
except Exception as error:
print(error)
break
老实说,我没有看到速度有什么大的变化。多处理的实际上更慢,这是它能得到的最基本的。我刚刚记得的是 PyPy,它以其计算速度而闻名。我真的不想设置它,但考虑到您的问题的简单、重复和纯计算性质,我认为它可能是您的解决方案。
基准是:
.3 秒 100 次迭代
单次迭代 10 秒,多次迭代 11 秒,迭代 1000 次
我放弃了,因为它花了多长时间。我不知道如何描述它,但是每增加一个幅度,您所做的工作就会增加 100 倍。使用高斯模式证明:
您正在计算每个数字的总和,直到 num,这意味着 1 + 2 + ...,Gauss 的模式涵盖了这一点。这应该可以大致了解它有多大:
10 作为输入需要 550 次迭代
100 作为输入需要 5050 次迭代
1000 作为输入需要 500500 次迭代
10000 作为输入需要 50005000 次迭代
通过 excel 处理数据后,它是 O(n^2),我想这还不错。如果您好奇的话,方程式是 ~.55x^2。
您介意将您制作的其他程序变体链接起来,以便我可以将它们与我自己的进行基准测试吗?因为老实说,我很想知道他们是否在工作 correctly/am 我做错了很多事。
Tl;DR:tests/code 你在用什么,我可以比较一下?你试过 PyPy 吗?与打印数字相比,数据库会好吗(它几乎肯定会更快)?你是如何设法让你的程序 运行 如此快速的单线程的?
希望对您有所帮助!
编辑:只是为了检查一下,您确实想要执行以下操作,对吗?在第一次迭代中,您一次打印 ID 和一个随机数。在第二次迭代中,您两次打印 ID 和一个随机数。只是想检查一下。
编辑 2:代码应该是固定的。
from concurrent.futures import TimeoutError
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys
def generate():
return random.randrange(-10, 10)
def produce(num):
tid = '%04x' % random.getrandbits(16)
for _ in range(num):
print('[%s] %3d' % (tid, generate()))
if __name__ == "__main__":
if len(sys.argv) < 3:
print("NEED ARGS")
exit(0)
num = int(sys.argv[1])
workers = int(sys.argv[2])
num_per_worker = int(num/workers)
#The magic numbers here are just optimizations. Feel free to change them
with ThreadPool(max_workers=workers, max_tasks=50) as pool:
future = pool.map(produce, (num_per_worker for _ in range(workers)),
chunksize=round(num/1024))
iterator = future.result()
while True:
try:
result = next(iterator)
except StopIteration:
break
编辑 3:循环
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys
from functools import partial
def generate():
return random.randrange(-10, 10)
def produce(num, magic_array):
tid = '%04x' % random.getrandbits(16)
for _ in range(num):
magic_array.append('[%s] %3d' % (tid, generate()))
if __name__ == "__main__":
if len(sys.argv) < 3:
print("NEED ARGS")
exit(0)
num = int(sys.argv[1])
workers = int(sys.argv[2])
num_per_worker = int(num/workers)
magic_array = []
#This is the how the magic array is used as an argument.
#There's probably a better way to do it, but I don't know it
part_produce = partial(produce, magic_array=magic_array)
#The magic numbers here are just optimizations. Feel free to change them
with ThreadPool(max_workers=workers, max_tasks=50) as pool:
future = pool.map(part_produce, (num_per_worker for _ in range(workers)), chunksize=num_per_worker)
iterator = future.result()
while True:
try:
result = next(iterator)
except StopIteration:
break
#This is the important part. For every iteration/worker unit, it will go
#through the list in steps of iteration/worker units, with a start offset
#of x
#Just printing takes about 5 seconds, but I don't think there's a faster
#method because printing takes a long time anyway
for x in range(num_per_worker):
for y in magic_array[x::num_per_worker]:
print(y)
在摆弄了许多不同的选项之后,我的(目前最终的)解决方案将使用 Memory mapped file 并且没有输出到标准输出。
在实施更接近生成器输出类型之一的更改后(UUID - 36 个字符 + 新行 - 来自生成器的 16 字节值),我机器上的结果数字是:
单线程
$ time python3 threading_output.py --threads 1 --count 10000000 | wc -l
10000000
real 0m15.915s
user 0m16.045s
sys 0m0.629s
使用 cinda BytesQueue 进行多处理
$ time python3 threading_output.py --threads 4 --count 10000000 | wc -l
10000000
real 0m30.005s
user 0m53.543s
sys 0m28.072s
使用 mmap 进行多处理
$ time python3 threading_output.py --threads 4 --count 10000000 --output-file test.txt
real 0m6.637s
user 0m18.688s
sys 0m1.265s
这看起来好多了,加速因子约为 2.4。虽然我们在这里使用了更多的内存,但系统应该允许
详情
mmap 版本的实际代码是利用内存映射文件的切片符号写入访问,以便每个进程都可以写入自己的 "slots"。为避免使用过多内存,每个进程还仅映射输出文件的特定区域 (mmap.PAGESIZE * <bytes_per_entry> * 100 * <processes>
),并在每个进程 mmap.PAGESIZE * 100
个条目后重新映射下一个区域。
完整的测试代码
mmap
版本使用函数 mmapped
和 produce_mm
:
实现
import random
import argparse
import multiprocessing as mp
import sys
from cinda.ipc.queue import BytesQueue as Queue
from cinda.ipc import free
import mmap
import os
class DataGenerator:
_id = 0
def __init__(self, id: int, **kwargs):
self._id = id
def generate(self):
return self._id.to_bytes(8, 'big') + random.getrandbits(64).to_bytes(8, 'big')
def produce(generator, q, num: int):
i = 0
while i < num:
i += 1
q.put(generator.generate())
def consume(queues: list, num: int):
i = 0
q_count = len(queues)
while i < num:
# enforce round-robin printing...
q = queues[i % q_count]
i += 1
hex = q.get().hex()
sys.stdout.write('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:]))
def multi_processed(num: int, processes: int):
per_process = int(num / processes)
if num % processes != 0:
per_process += 1
q = list()
g = list()
for i in range(processes):
name = 'gen-%d' % i
free(name)
q.append(Queue(name, 10000, 16))
g.append(DataGenerator(i + 1))
procs = list()
for i in range(processes):
p = mp.Process(target=produce, args=(g[i], q[i], per_process))
p.start()
procs.append(p)
consume(q, num)
for p in procs:
p.join()
def produce_mm(generator, out_file, num: int, offset: int, num_total: int, processes: int):
entry_size = 37 # number of bytes per entry
seek_relative = processes * entry_size # relative offset to next slot
seek_offset = offset * entry_size # initial slot offset for this process
buffer_switch = mmap.PAGESIZE * 100 # number of "my" entries per buffer
buffer_num = buffer_switch * processes # number of entries to mmap at any time
buffer_size = buffer_num * entry_size # actual mmap'd buffer size
buffer_offset = 0
size = num_total * entry_size
with open(out_file, "r+b") as f:
mem = mmap.mmap(f.fileno(), min(num_total * entry_size, buffer_size), access=mmap.ACCESS_WRITE)
# generate and write the first entry (saves an if-clause later)
hex = generator.generate().hex()
mem[seek_offset:(seek_offset + entry_size)] = ('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])).encode('US-ASCII')
(i, j) = (1, 1)
while i < num:
# close current, create next buffer and reset entry offset
if (i % buffer_switch) == 0:
mem.flush()
mem.close()
buffer_offset += buffer_size
buffer_size = min(size - buffer_offset, buffer_size)
#sys.stderr.write('New buffer[%d] at %d\n' % (buffer_size, buffer_offset))
mem = mmap.mmap(f.fileno(), buffer_size, access=mmap.ACCESS_WRITE, offset=buffer_offset)
j = 0
# calculate [start:end] offsets for this slot
off_start = seek_relative * j + seek_offset
off_end = off_start + entry_size
if off_end > buffer_size:
break
hex = generator.generate().hex()
try:
mem[off_start:off_end] = ('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])).encode('US-ASCII')
except IndexError as e:
sys.stderr.write('%s (tried [%d:%d] for mmap size %d, offset %d)\n' % (e, off_start, off_end, buffer_size, j))
break
(i, j) = (i + 1, j + 1)
def mmapped(num: int, processes: int, out_file):
per_process = int(num / processes)
if num % processes != 0:
per_process += 1
with open(out_file, "wb") as f:
f.seek((num * 37 - 1), os.SEEK_SET)
f.write(b'[=13=]')
f.flush()
g = list()
for i in range(processes):
g.append(DataGenerator(i + 1))
procs = list()
for i in range(processes):
p = mp.Process(target=produce_mm, args=(g[i], out_file, per_process, i, num, processes))
p.start()
procs.append(p)
for p in procs:
p.join()
def single_threaded(num: int, processes: int):
g = list()
for i in range(processes):
g.append(DataGenerator(i + 1))
for i in range(num):
j = i % processes
hex = g[j].generate().hex()
sys.stdout.write('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:]))
def main():
parser = argparse.ArgumentParser(description='Threading test')
parser.add_argument(
'--count', '-c', dest='count', type=int, default=1000000,
help='How many total iterations (default: 1000000)')
parser.add_argument(
'--threads', '-t', dest='threads', type=int, default=1,
help='how many threads to use (default: 1 - single-threaded)')
parser.add_argument(
'--output-file', '-o', dest='out_file', type=str,
help='specify output file to write into')
args = parser.parse_args()
if args.threads > 1:
if args.out_file is None:
multi_processed(args.count, abs(args.threads))
else:
mmapped(args.count, abs(args.threads), args.out_file)
else:
single_threaded(args.count, mp.cpu_count())
if __name__ == '__main__':
main()
我有以下问题:
g
个数据生成器,每个生成器都是独立的 Process
,产生 n
我需要以循环方式(超过 g
)写出的值.因此对于生成器 A
、B
、C
,输出必须按顺序排列,如:
<value 1 of A>
<value 1 of B>
<value 1 of C>
<value 2 of A>
<value 2 of B>
<value 2 of C>
<value 3 of A>
...
在功能上它可以工作,但仍然比单线程慢得多。因为我需要生成大量值,所以我想并行化生成(可能还有输出,这是我目前无法做到的)。
代码(已更新为使用 mp.Pipe
并包含单独的数据生成器):
import random
import argparse
import multiprocessing as mp
import sys
class DataGenerator:
_id = 0
def __init__(self, id: int, **kwargs):
self._id = id
def generate(self):
return '%03d:%4x' % (self._id, random.getrandbits(16))
def produce(generator, pipe, num: int):
p_read, p_write = pipe
i = 0
while i < num:
i += 1
p_write.send(generator.generate())
def consume(pipes: list, num: int):
i = 0
p_count = len(pipes)
while i < num:
# enforce round-robin printing...
p_idx = i % p_count
p_read, p_write = pipes[p_idx]
i += 1
sys.stdout.write(p_read.recv() + '\n')
def multi_processed(num: int, processes: int):
per_process = int(num / processes)
if num % processes != 0:
per_process += 1
q = list()
g = list()
for i in range(processes):
q.append(mp.Pipe(False))
g.append(DataGenerator(i + 1))
procs = list()
for i in range(processes):
p = mp.Process(target=produce, args=(g[i], q[i], per_process))
p.start()
procs.append(p)
consume(q, num)
for p in procs:
p.join()
def single_threaded(num: int, processes: int):
g = list()
for i in range(processes):
g.append(DataGenerator(i + 1))
for i in range(num):
j = i % processes
print(g[j].generate())
def main():
parser = argparse.ArgumentParser(description='Threading test')
parser.add_argument(
'--count', '-c', dest='count', type=int, default=1000000,
help='How many total iterations (default: 1000000)')
parser.add_argument(
'--threads', '-t', dest='threads', type=int, default=1,
help='how many threads to use (default: 1 - single-threaded)')
args = parser.parse_args()
if args.threads > 1:
multi_processed(args.count, abs(args.threads))
else:
single_threaded(args.count, mp.cpu_count())
if __name__ == '__main__':
main()
在执行时,它占用了我所有的 4 CPU 个内核,但在性能方面它比顺序执行慢:
单线程 10,000,000 个总值的执行时间:
$ time python3 threading_output.py --threads 1 --count 10000000 | wc -l
10000000
real 0m16.557s
user 0m16.443s
sys 0m0.437s
...multiprocessing
实现也是如此:
$ time python3 threading_output.py --threads 4 --count 10000000 | wc -l
10000000
real 1m6.446s
user 3m10.073s
sys 0m54.274s
不使用 mp.Queue
并直接在 produce
循环内打印生成的值给我大约 9.6 秒,但当然,输出行没有确定的顺序。
我怎样才能加快速度?
更新#1
使用 mp.Array
不是共享缓冲区的选项,因为我需要对字符串数组使用 ctype c_wchar_p
,根据docs.
更新 #2
将 mp.Queue(1000)
替换为 mp.Pipe(False)
,这使 1000 万个值的时间缩短到约 45 秒。生产者进程现在对 CPU 的占用少了很多,消费者是明显的瓶颈:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
5943 ancoron 20 0 28420 15168 8348 R 99.9 0.0 0:12.23 `- python3 threading_output.py --threads 4 --count 10000000
5947 ancoron 20 0 28284 10336 3536 R 29.9 0.0 0:03.69 `- python3 threading_output.py --threads 4 --count 10000000
5948 ancoron 20 0 28284 10336 3536 R 30.8 0.0 0:03.71 `- python3 threading_output.py --threads 4 --count 10000000
5949 ancoron 20 0 28284 10336 3536 R 30.8 0.0 0:03.71 `- python3 threading_output.py --threads 4 --count 10000000
5950 ancoron 20 0 28284 10340 3536 R 29.0 0.0 0:03.58 `- python3 threading_output.py --threads 4 --count 10000000
更新#3
我尝试使用 cinda,使用简单的 BytesQueue
,将时间缩短到约 23 秒。仍然比单线程慢。
好的,我做了一些测试,现在我感到困惑。我做了多线程和异步解决方案,但都不是特别好。我还复制并粘贴了你的代码,它总是挂起,即使它是 'done'。
请注意,在我的代码中,我使用了它作为 TID 给出的数字,而不是 4 个随机的十六进制数字,因为我想确保它正在做你想要的。另一种方式更难分辨,可以很容易地改成十六进制。
单线程:
import random
import sys
def generate():
return random.randrange(-10, 10)
if len(sys.argv) < 2:
print("NEED ARGS")
exit(0)
num = int(sys.argv[1])
for x in range(num):
for _ in range(x):
print("[{}]: {}".format(x, generate()))
多线程:
from concurrent.futures import TimeoutError
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys
def generate():
return random.randrange(-10, 10)
def produce(num):
#tid = '%04x' % random.getrandbits(16)
tid = num
i = 0
while i < num:
print('[%s] %3d' % (tid, generate()))
i += 1
if __name__ == "__main__":
if len(sys.argv) < 2:
print("NEED ARGS")
exit(0)
num = int(sys.argv[1])
with ThreadPool(max_workers=mp.cpu_count()) as pool:
future = pool.map(produce, range(num), timeout=3)
iterator = future.result()
while True:
try:
result = next(iterator)
except StopIteration:
break
except TimeoutError as error:
print(error)
break
except ProcessExpired as error:
print(error)
break
except Exception as error:
print(error)
break
老实说,我没有看到速度有什么大的变化。多处理的实际上更慢,这是它能得到的最基本的。我刚刚记得的是 PyPy,它以其计算速度而闻名。我真的不想设置它,但考虑到您的问题的简单、重复和纯计算性质,我认为它可能是您的解决方案。
基准是:
.3 秒 100 次迭代
单次迭代 10 秒,多次迭代 11 秒,迭代 1000 次
我放弃了,因为它花了多长时间。我不知道如何描述它,但是每增加一个幅度,您所做的工作就会增加 100 倍。使用高斯模式证明:
您正在计算每个数字的总和,直到 num,这意味着 1 + 2 + ...,Gauss 的模式涵盖了这一点。这应该可以大致了解它有多大:
10 作为输入需要 550 次迭代
100 作为输入需要 5050 次迭代
1000 作为输入需要 500500 次迭代
10000 作为输入需要 50005000 次迭代
通过 excel 处理数据后,它是 O(n^2),我想这还不错。如果您好奇的话,方程式是 ~.55x^2。
您介意将您制作的其他程序变体链接起来,以便我可以将它们与我自己的进行基准测试吗?因为老实说,我很想知道他们是否在工作 correctly/am 我做错了很多事。
Tl;DR:tests/code 你在用什么,我可以比较一下?你试过 PyPy 吗?与打印数字相比,数据库会好吗(它几乎肯定会更快)?你是如何设法让你的程序 运行 如此快速的单线程的?
希望对您有所帮助!
编辑:只是为了检查一下,您确实想要执行以下操作,对吗?在第一次迭代中,您一次打印 ID 和一个随机数。在第二次迭代中,您两次打印 ID 和一个随机数。只是想检查一下。
编辑 2:代码应该是固定的。
from concurrent.futures import TimeoutError
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys
def generate():
return random.randrange(-10, 10)
def produce(num):
tid = '%04x' % random.getrandbits(16)
for _ in range(num):
print('[%s] %3d' % (tid, generate()))
if __name__ == "__main__":
if len(sys.argv) < 3:
print("NEED ARGS")
exit(0)
num = int(sys.argv[1])
workers = int(sys.argv[2])
num_per_worker = int(num/workers)
#The magic numbers here are just optimizations. Feel free to change them
with ThreadPool(max_workers=workers, max_tasks=50) as pool:
future = pool.map(produce, (num_per_worker for _ in range(workers)),
chunksize=round(num/1024))
iterator = future.result()
while True:
try:
result = next(iterator)
except StopIteration:
break
编辑 3:循环
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys
from functools import partial
def generate():
return random.randrange(-10, 10)
def produce(num, magic_array):
tid = '%04x' % random.getrandbits(16)
for _ in range(num):
magic_array.append('[%s] %3d' % (tid, generate()))
if __name__ == "__main__":
if len(sys.argv) < 3:
print("NEED ARGS")
exit(0)
num = int(sys.argv[1])
workers = int(sys.argv[2])
num_per_worker = int(num/workers)
magic_array = []
#This is the how the magic array is used as an argument.
#There's probably a better way to do it, but I don't know it
part_produce = partial(produce, magic_array=magic_array)
#The magic numbers here are just optimizations. Feel free to change them
with ThreadPool(max_workers=workers, max_tasks=50) as pool:
future = pool.map(part_produce, (num_per_worker for _ in range(workers)), chunksize=num_per_worker)
iterator = future.result()
while True:
try:
result = next(iterator)
except StopIteration:
break
#This is the important part. For every iteration/worker unit, it will go
#through the list in steps of iteration/worker units, with a start offset
#of x
#Just printing takes about 5 seconds, but I don't think there's a faster
#method because printing takes a long time anyway
for x in range(num_per_worker):
for y in magic_array[x::num_per_worker]:
print(y)
在摆弄了许多不同的选项之后,我的(目前最终的)解决方案将使用 Memory mapped file 并且没有输出到标准输出。
在实施更接近生成器输出类型之一的更改后(UUID - 36 个字符 + 新行 - 来自生成器的 16 字节值),我机器上的结果数字是:
单线程
$ time python3 threading_output.py --threads 1 --count 10000000 | wc -l
10000000
real 0m15.915s
user 0m16.045s
sys 0m0.629s
使用 cinda BytesQueue 进行多处理
$ time python3 threading_output.py --threads 4 --count 10000000 | wc -l
10000000
real 0m30.005s
user 0m53.543s
sys 0m28.072s
使用 mmap 进行多处理
$ time python3 threading_output.py --threads 4 --count 10000000 --output-file test.txt
real 0m6.637s
user 0m18.688s
sys 0m1.265s
这看起来好多了,加速因子约为 2.4。虽然我们在这里使用了更多的内存,但系统应该允许
详情
mmap 版本的实际代码是利用内存映射文件的切片符号写入访问,以便每个进程都可以写入自己的 "slots"。为避免使用过多内存,每个进程还仅映射输出文件的特定区域 (mmap.PAGESIZE * <bytes_per_entry> * 100 * <processes>
),并在每个进程 mmap.PAGESIZE * 100
个条目后重新映射下一个区域。
完整的测试代码
mmap
版本使用函数 mmapped
和 produce_mm
:
import random
import argparse
import multiprocessing as mp
import sys
from cinda.ipc.queue import BytesQueue as Queue
from cinda.ipc import free
import mmap
import os
class DataGenerator:
_id = 0
def __init__(self, id: int, **kwargs):
self._id = id
def generate(self):
return self._id.to_bytes(8, 'big') + random.getrandbits(64).to_bytes(8, 'big')
def produce(generator, q, num: int):
i = 0
while i < num:
i += 1
q.put(generator.generate())
def consume(queues: list, num: int):
i = 0
q_count = len(queues)
while i < num:
# enforce round-robin printing...
q = queues[i % q_count]
i += 1
hex = q.get().hex()
sys.stdout.write('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:]))
def multi_processed(num: int, processes: int):
per_process = int(num / processes)
if num % processes != 0:
per_process += 1
q = list()
g = list()
for i in range(processes):
name = 'gen-%d' % i
free(name)
q.append(Queue(name, 10000, 16))
g.append(DataGenerator(i + 1))
procs = list()
for i in range(processes):
p = mp.Process(target=produce, args=(g[i], q[i], per_process))
p.start()
procs.append(p)
consume(q, num)
for p in procs:
p.join()
def produce_mm(generator, out_file, num: int, offset: int, num_total: int, processes: int):
entry_size = 37 # number of bytes per entry
seek_relative = processes * entry_size # relative offset to next slot
seek_offset = offset * entry_size # initial slot offset for this process
buffer_switch = mmap.PAGESIZE * 100 # number of "my" entries per buffer
buffer_num = buffer_switch * processes # number of entries to mmap at any time
buffer_size = buffer_num * entry_size # actual mmap'd buffer size
buffer_offset = 0
size = num_total * entry_size
with open(out_file, "r+b") as f:
mem = mmap.mmap(f.fileno(), min(num_total * entry_size, buffer_size), access=mmap.ACCESS_WRITE)
# generate and write the first entry (saves an if-clause later)
hex = generator.generate().hex()
mem[seek_offset:(seek_offset + entry_size)] = ('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])).encode('US-ASCII')
(i, j) = (1, 1)
while i < num:
# close current, create next buffer and reset entry offset
if (i % buffer_switch) == 0:
mem.flush()
mem.close()
buffer_offset += buffer_size
buffer_size = min(size - buffer_offset, buffer_size)
#sys.stderr.write('New buffer[%d] at %d\n' % (buffer_size, buffer_offset))
mem = mmap.mmap(f.fileno(), buffer_size, access=mmap.ACCESS_WRITE, offset=buffer_offset)
j = 0
# calculate [start:end] offsets for this slot
off_start = seek_relative * j + seek_offset
off_end = off_start + entry_size
if off_end > buffer_size:
break
hex = generator.generate().hex()
try:
mem[off_start:off_end] = ('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])).encode('US-ASCII')
except IndexError as e:
sys.stderr.write('%s (tried [%d:%d] for mmap size %d, offset %d)\n' % (e, off_start, off_end, buffer_size, j))
break
(i, j) = (i + 1, j + 1)
def mmapped(num: int, processes: int, out_file):
per_process = int(num / processes)
if num % processes != 0:
per_process += 1
with open(out_file, "wb") as f:
f.seek((num * 37 - 1), os.SEEK_SET)
f.write(b'[=13=]')
f.flush()
g = list()
for i in range(processes):
g.append(DataGenerator(i + 1))
procs = list()
for i in range(processes):
p = mp.Process(target=produce_mm, args=(g[i], out_file, per_process, i, num, processes))
p.start()
procs.append(p)
for p in procs:
p.join()
def single_threaded(num: int, processes: int):
g = list()
for i in range(processes):
g.append(DataGenerator(i + 1))
for i in range(num):
j = i % processes
hex = g[j].generate().hex()
sys.stdout.write('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:]))
def main():
parser = argparse.ArgumentParser(description='Threading test')
parser.add_argument(
'--count', '-c', dest='count', type=int, default=1000000,
help='How many total iterations (default: 1000000)')
parser.add_argument(
'--threads', '-t', dest='threads', type=int, default=1,
help='how many threads to use (default: 1 - single-threaded)')
parser.add_argument(
'--output-file', '-o', dest='out_file', type=str,
help='specify output file to write into')
args = parser.parse_args()
if args.threads > 1:
if args.out_file is None:
multi_processed(args.count, abs(args.threads))
else:
mmapped(args.count, abs(args.threads), args.out_file)
else:
single_threaded(args.count, mp.cpu_count())
if __name__ == '__main__':
main()