具有多处理的特定标准输出行顺序

Specific stdout line order with multi-processing

我有以下问题:

g 个数据生成器,每个生成器都是独立的 Process,产生 n 我需要以循环方式(超过 g)写出的值.因此对于生成器 ABC,输出必须按顺序排列,如:

<value 1 of A>
<value 1 of B>
<value 1 of C>
<value 2 of A>
<value 2 of B>
<value 2 of C>
<value 3 of A>
...

在功能上它可以工作,但仍然比单线程慢得多。因为我需要生成大量值,所以我想并行化生成(可能还有输出,这是我目前无法做到的)。

代码(已更新为使用 mp.Pipe 并包含单独的数据生成器):

import random
import argparse
import multiprocessing as mp
import sys


class DataGenerator:
    _id = 0

    def __init__(self, id: int, **kwargs):
        self._id = id

    def generate(self):
        return '%03d:%4x' % (self._id, random.getrandbits(16))


def produce(generator, pipe, num: int):
    p_read, p_write = pipe
    i = 0
    while i < num:
        i += 1
        p_write.send(generator.generate())


def consume(pipes: list, num: int):
    i = 0
    p_count = len(pipes)

    while i < num:
        # enforce round-robin printing...
        p_idx = i % p_count
        p_read, p_write = pipes[p_idx]

        i += 1
        sys.stdout.write(p_read.recv() + '\n')


def multi_processed(num: int, processes: int):
    per_process = int(num / processes)
    if num % processes != 0:
        per_process += 1

    q = list()
    g = list()
    for i in range(processes):
        q.append(mp.Pipe(False))
        g.append(DataGenerator(i + 1))

    procs = list()
    for i in range(processes):
        p = mp.Process(target=produce, args=(g[i], q[i], per_process))
        p.start()
        procs.append(p)

    consume(q, num)

    for p in procs:
        p.join()


def single_threaded(num: int, processes: int):
    g = list()
    for i in range(processes):
        g.append(DataGenerator(i + 1))

    for i in range(num):
        j = i % processes
        print(g[j].generate())


def main():
    parser = argparse.ArgumentParser(description='Threading test')
    parser.add_argument(
        '--count', '-c', dest='count', type=int, default=1000000,
        help='How many total iterations (default: 1000000)')
    parser.add_argument(
        '--threads', '-t', dest='threads', type=int, default=1,
        help='how many threads to use (default: 1 - single-threaded)')
    args = parser.parse_args()

    if args.threads > 1:
        multi_processed(args.count, abs(args.threads))
    else:
        single_threaded(args.count, mp.cpu_count())


if __name__ == '__main__':
    main()

在执行时,它占用了我所有的 4 CPU 个内核,但在性能方面它比顺序执行慢:

单线程 10,000,000 个总值的执行时间:

$ time python3 threading_output.py --threads 1 --count 10000000 | wc -l
10000000

real    0m16.557s
user    0m16.443s
sys     0m0.437s

...multiprocessing 实现也是如此:

$ time python3 threading_output.py --threads 4 --count 10000000 | wc -l
10000000

real    1m6.446s
user    3m10.073s
sys     0m54.274s

不使用 mp.Queue 并直接在 produce 循环内打印生成的值给我大约 9.6 秒,但当然,输出行没有确定的顺序。

我怎样才能加快速度?

更新#1

使用 mp.Array 不是共享缓冲区的选项,因为我需要对字符串数组使用 ctype c_wchar_p,根据docs.

更新 #2

mp.Queue(1000) 替换为 mp.Pipe(False),这使 1000 万个值的时间缩短到约 45 秒。生产者进程现在对 CPU 的占用少了很多,消费者是明显的瓶颈:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND
 5943 ancoron   20   0   28420  15168   8348 R  99.9   0.0   0:12.23 `- python3 threading_output.py --threads 4 --count 10000000
 5947 ancoron   20   0   28284  10336   3536 R  29.9   0.0   0:03.69     `- python3 threading_output.py --threads 4 --count 10000000
 5948 ancoron   20   0   28284  10336   3536 R  30.8   0.0   0:03.71     `- python3 threading_output.py --threads 4 --count 10000000
 5949 ancoron   20   0   28284  10336   3536 R  30.8   0.0   0:03.71     `- python3 threading_output.py --threads 4 --count 10000000
 5950 ancoron   20   0   28284  10340   3536 R  29.0   0.0   0:03.58     `- python3 threading_output.py --threads 4 --count 10000000

更新#3

我尝试使用 cinda,使用简单的 BytesQueue,将时间缩短到约 23 秒。仍然比单线程慢。

好的,我做了一些测试,现在我感到困惑。我做了多线程和异步解决方案,但都不是特别好。我还复制并粘贴了你的代码,它总是挂起,即使它是 'done'。

请注意,在我的代码中,我使用了它作为 TID 给出的数字,而不是 4 个随机的十六进制数字,因为我想确保它正在做你想要的。另一种方式更难分辨,可以很容易地改成十六进制。

单线程:

import random
import sys

def generate():
    return random.randrange(-10, 10)

if len(sys.argv) < 2:
    print("NEED ARGS")
    exit(0)

num = int(sys.argv[1])
for x in range(num):
    for _ in range(x):
        print("[{}]: {}".format(x, generate()))

多线程:

from concurrent.futures import TimeoutError
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys 

def generate():
    return random.randrange(-10, 10) 

def produce(num):
    #tid = '%04x' % random.getrandbits(16)
    tid = num 
    i = 0 
    while i < num:
        print('[%s] %3d' % (tid, generate()))
        i += 1

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("NEED ARGS")
        exit(0)

    num = int(sys.argv[1])
    with ThreadPool(max_workers=mp.cpu_count()) as pool:
        future = pool.map(produce, range(num), timeout=3)
        iterator = future.result()
        while True:
            try:
                result = next(iterator)
            except StopIteration:
                break
            except TimeoutError as error:
                print(error)
                break
            except ProcessExpired as error:
                print(error)
                break
            except Exception as error:
                print(error)
                break

老实说,我没有看到速度有什么大的变化。多处理的实际上更慢,这是它能得到的最基本的。我刚刚记得的是 PyPy,它以其计算速度而闻名。我真的不想设置它,但考虑到您的问题的简单、重复和纯计算性质,我认为它可能是您的解决方案。

基准是:

.3 秒 100 次迭代

单次迭代 10 秒,多次迭代 11 秒,迭代 1000 次

我放弃了,因为它花了多长时间。我不知道如何描述它,但是每增加一个幅度,您所做的工作就会增加 100 倍。使用高斯模式证明:

您正在计算每个数字的总和,直到 num,这意味着 1 + 2 + ...,Gauss 的模式涵盖了这一点。这应该可以大致了解它有多大:

10 作为输入需要 550 次迭代

100 作为输入需要 5050 次迭代

1000 作为输入需要 500500 次迭代

10000 作为输入需要 50005000 次迭代

通过 excel 处理数据后,它是 O(n^2),我想这还不错。如果您好奇的话,方程式是 ~.55x^2。

您介意将您制作的其他程序变体链接起来,以便我可以将它们与我自己的进行基准测试吗?因为老实说,我很想知道他们是否在工作 correctly/am 我做错了很多事。

Tl;DR:tests/code 你在用什么,我可以比较一下?你试过 PyPy 吗?与打印数字相比,数据库会好吗(它几乎肯定会更快)?你是如何设法让你的程序 运行 如此快速的单线程的?

希望对您有所帮助!

编辑:只是为了检查一下,您确实想要执行以下操作,对吗?在第一次迭代中,您一次打印 ID 和一个随机数。在第二次迭代中,您两次打印 ID 和一个随机数。只是想检查一下。

编辑 2:代码应该是固定的。

from concurrent.futures import TimeoutError
from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys

def generate():
    return random.randrange(-10, 10)

def produce(num):
    tid = '%04x' % random.getrandbits(16)
    for _ in range(num):
        print('[%s] %3d' % (tid, generate()))

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("NEED ARGS")
        exit(0)

    num = int(sys.argv[1])
    workers = int(sys.argv[2])
    num_per_worker = int(num/workers)

    #The magic numbers here are just optimizations. Feel free to change them
    with ThreadPool(max_workers=workers, max_tasks=50) as pool:
        future = pool.map(produce, (num_per_worker for _ in range(workers)),
                          chunksize=round(num/1024))
        iterator = future.result()
        while True:
            try:
                result = next(iterator)
            except StopIteration:
                break

编辑 3:循环

from pebble import ThreadPool, ProcessExpired
import random
import multiprocessing as mp
import sys 
from functools import partial

def generate():
    return random.randrange(-10, 10) 

def produce(num, magic_array):
    tid = '%04x' % random.getrandbits(16)
    for _ in range(num):
        magic_array.append('[%s] %3d' % (tid, generate()))

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("NEED ARGS")
        exit(0)

    num = int(sys.argv[1])
    workers = int(sys.argv[2])
    num_per_worker = int(num/workers)
    magic_array = []

    #This is the how the magic array is used as an argument.
    #There's probably a better way to do it, but I don't know it
    part_produce = partial(produce, magic_array=magic_array)
    #The magic numbers here are just optimizations. Feel free to change them
    with ThreadPool(max_workers=workers, max_tasks=50) as pool:
        future = pool.map(part_produce, (num_per_worker for _ in range(workers)), chunksize=num_per_worker)
        iterator = future.result()
        while True:
            try:
                result = next(iterator)
            except StopIteration:
                break

    #This is the important part. For every iteration/worker unit, it will go
    #through the list in steps of iteration/worker units, with a start offset
    #of x
    #Just printing takes about 5 seconds, but I don't think there's a faster
    #method because printing takes a long time anyway
    for x in range(num_per_worker):
        for y in magic_array[x::num_per_worker]:
            print(y)

在摆弄了许多不同的选项之后,我的(目前最终的)解决方案将使用 Memory mapped file 并且没有输出到标准输出。

在实施更接近生成器输出类型之一的更改后(UUID - 36 个字符 + 新行 - 来自生成器的 16 字节值),我机器上的结果数字是:

单线程

$ time python3 threading_output.py --threads 1 --count 10000000 | wc -l
10000000

real    0m15.915s
user    0m16.045s
sys     0m0.629s

使用 cinda BytesQueue 进行多处理

$ time python3 threading_output.py --threads 4 --count 10000000 | wc -l
10000000

real    0m30.005s
user    0m53.543s
sys     0m28.072s

使用 mmap 进行多处理

$ time python3 threading_output.py --threads 4 --count 10000000 --output-file test.txt

real    0m6.637s
user    0m18.688s
sys     0m1.265s

这看起来好多了,加速因子约为 2.4。虽然我们在这里使用了更多的内存,但系统应该允许

详情

mmap 版本的实际代码是利用内存映射文件的切片符号写入访问,以便每个进程都可以写入自己的 "slots"。为避免使用过多内存,每个进程还仅映射输出文件的特定区域 (mmap.PAGESIZE * <bytes_per_entry> * 100 * <processes>),并在每个进程 mmap.PAGESIZE * 100 个条目后重新映射下一个区域。

完整的测试代码

mmap 版本使用函数 mmappedproduce_mm:

实现

import random
import argparse
import multiprocessing as mp
import sys
from cinda.ipc.queue import BytesQueue as Queue
from cinda.ipc import free
import mmap
import os


class DataGenerator:
    _id = 0

    def __init__(self, id: int, **kwargs):
        self._id = id

    def generate(self):
        return self._id.to_bytes(8, 'big') + random.getrandbits(64).to_bytes(8, 'big')


def produce(generator, q, num: int):
    i = 0
    while i < num:
        i += 1
        q.put(generator.generate())


def consume(queues: list, num: int):
    i = 0
    q_count = len(queues)

    while i < num:
        # enforce round-robin printing...
        q = queues[i % q_count]

        i += 1
        hex = q.get().hex()
        sys.stdout.write('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:]))


def multi_processed(num: int, processes: int):
    per_process = int(num / processes)
    if num % processes != 0:
        per_process += 1

    q = list()
    g = list()
    for i in range(processes):
        name = 'gen-%d' % i
        free(name)
        q.append(Queue(name, 10000, 16))
        g.append(DataGenerator(i + 1))

    procs = list()
    for i in range(processes):
        p = mp.Process(target=produce, args=(g[i], q[i], per_process))
        p.start()
        procs.append(p)

    consume(q, num)

    for p in procs:
        p.join()


def produce_mm(generator, out_file, num: int, offset: int, num_total: int, processes: int):
    entry_size = 37 # number of bytes per entry
    seek_relative = processes * entry_size # relative offset to next slot
    seek_offset = offset * entry_size # initial slot offset for this process
    buffer_switch = mmap.PAGESIZE * 100 # number of "my" entries per buffer
    buffer_num = buffer_switch * processes # number of entries to mmap at any time
    buffer_size = buffer_num * entry_size # actual mmap'd buffer size
    buffer_offset = 0

    size = num_total * entry_size

    with open(out_file, "r+b") as f:
        mem = mmap.mmap(f.fileno(), min(num_total * entry_size, buffer_size), access=mmap.ACCESS_WRITE)

        # generate and write the first entry (saves an if-clause later)
        hex = generator.generate().hex()
        mem[seek_offset:(seek_offset + entry_size)] = ('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])).encode('US-ASCII')

        (i, j) = (1, 1)
        while i < num:
            # close current, create next buffer and reset entry offset
            if (i % buffer_switch) == 0:
                mem.flush()
                mem.close()
                buffer_offset += buffer_size
                buffer_size = min(size - buffer_offset, buffer_size)
                #sys.stderr.write('New buffer[%d] at %d\n' % (buffer_size, buffer_offset))
                mem = mmap.mmap(f.fileno(), buffer_size, access=mmap.ACCESS_WRITE, offset=buffer_offset)
                j = 0

            # calculate [start:end] offsets for this slot
            off_start = seek_relative * j + seek_offset
            off_end = off_start + entry_size
            if off_end > buffer_size:
                break

            hex = generator.generate().hex()
            try:
                mem[off_start:off_end] = ('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:])).encode('US-ASCII')
            except IndexError as e:
                sys.stderr.write('%s (tried [%d:%d] for mmap size %d, offset %d)\n' % (e, off_start, off_end, buffer_size, j))
                break

            (i, j) = (i + 1, j + 1)


def mmapped(num: int, processes: int, out_file):
    per_process = int(num / processes)
    if num % processes != 0:
        per_process += 1

    with open(out_file, "wb") as f:
        f.seek((num * 37 - 1), os.SEEK_SET)
        f.write(b'[=13=]')
        f.flush()

    g = list()
    for i in range(processes):
        g.append(DataGenerator(i + 1))

    procs = list()
    for i in range(processes):
        p = mp.Process(target=produce_mm, args=(g[i], out_file, per_process, i, num, processes))
        p.start()
        procs.append(p)

    for p in procs:
        p.join()


def single_threaded(num: int, processes: int):
    g = list()
    for i in range(processes):
        g.append(DataGenerator(i + 1))

    for i in range(num):
        j = i % processes
        hex = g[j].generate().hex()
        sys.stdout.write('%8s-%4s-%4s-%4s-%12s\n' % (hex[:8], hex[8:12], hex[12:16], hex[16:20], hex[20:]))


def main():
    parser = argparse.ArgumentParser(description='Threading test')
    parser.add_argument(
        '--count', '-c', dest='count', type=int, default=1000000,
        help='How many total iterations (default: 1000000)')
    parser.add_argument(
        '--threads', '-t', dest='threads', type=int, default=1,
        help='how many threads to use (default: 1 - single-threaded)')
    parser.add_argument(
        '--output-file', '-o', dest='out_file', type=str,
        help='specify output file to write into')
    args = parser.parse_args()

    if args.threads > 1:
        if args.out_file is None:
            multi_processed(args.count, abs(args.threads))
        else:
            mmapped(args.count, abs(args.threads), args.out_file)
    else:
        single_threaded(args.count, mp.cpu_count())


if __name__ == '__main__':
    main()