在 Python 中高效地执行映射

Execute mapping efficiently in Python

我有一些将数据映射到矩阵的代码...其中大部分都是设置好的,因此我的问题可以很容易地重现,但我唯一需要加快速度的部分是评论后的内容 # the part I want to speed up

import numpy as np
# config
matrix_height = 100
matrix_width  = 200

# fake data
x_data = np.array(range(10000))
y_data = [{i:i for i in range(100)}  for t in range(len(x_data))]

# fake mapping
x_to_index = {x: np.random.randint(matrix_width) for x in x_data }
y_to_index = {}
for y_dict in y_data:
  for y_key, y_val in y_dict.items():
    y_start = np.random.randint(matrix_height-2)
    y_to_index[y_key] = (y_start, y_start+2 )

# data that must be stored
total_matrix = np.zeros([matrix_height, matrix_width]).astype(int)
n_matrix     = np.zeros([matrix_height, matrix_width]).astype(int)
latest_value = np.zeros([matrix_height, matrix_width]).astype(int)

# the part I want to speed up
for x, y_dict in zip(x_data, y_data):
    x_index = x_to_index[x]
    for y_key, y_data in y_dict.items():
        y_slice = slice(*y_to_index[y_key])
        total_matrix[ y_slice, x_index ] += y_data
        latest_value[ y_slice, x_index ]  = y_data
        n_matrix[ y_slice, x_index ]     += 1

同样,我关注评论下方的部分# the part I want to speed up

我不确定如何加快速度,但似乎应该可以使用可以并行执行所有操作的映射函数...

我正在寻找最后一部分的显着改进。有什么想法吗?

根据核心数定制。

对于total_matrix,加法是可交换的。

对于 latest_value,以相反的顺序应用拆分列。

import numpy as np
import time
import multiprocessing as mp

def func(cols, zz, tm, nm, lv, setOrder):
    for c in cols:
        for t in zz:
            tm[slice(*t[0]), c] += t[1]
            lv[slice(*t[0]), c] = t[1]
            nm[slice(*t[0]), c] += 1
    return [tm, nm, lv, setOrder]

if __name__ == '__main__':
    mp.freeze_support()

    matrix_height = 100
    matrix_width = 200
    total_range = 10000

    x_data = np.array(range(total_range))
    y_data = [{i:i for i in range(matrix_height)} for t in range(len(x_data))]

    x_to_index = {x: np.random.randint(matrix_width) for x in x_data}
    # potential general purpose cols list
    #cols = np.random.randint(0, total_range, (1, total_range))[0]
    cols = [np.int(x_to_index[k]) for k in x_to_index]

    y_to_index = {}
    for y_dict in y_data:
        for y_key, y_val in y_dict.items():
            y_start = np.random.randint(matrix_height-2)
            y_to_index[y_key] = (y_start, y_start+2)

    # potential general purpose rows list
    #rows = [(np.random.randint(matrix_height), np.random.randint(matrix_height)) for x in range(matrix_height)]
    rows = [y_to_index[k] for k in y_to_index]

    # potential general purpose y data
    #y_dat = np.random.randint(0, matrix_height, (1, matrix_height))[0]
    y_dat = [i for i in range(matrix_height)]

    o_total_matrix = np.zeros([matrix_height, matrix_width]).astype(int)
    o_n_matrix     = np.zeros([matrix_height, matrix_width]).astype(int)
    o_latest_value = np.zeros([matrix_height, matrix_width]).astype(int)

    startTime = time.time()
    for x, y_dict in zip(x_data, y_data):
        x_index = x_to_index[x]
        for y_key, y_data in y_dict.items():
            y_slice = slice(*y_to_index[y_key])
            o_total_matrix[ y_slice, x_index ] += y_data
            o_latest_value[ y_slice, x_index ]  = y_data
            o_n_matrix[ y_slice, x_index ]     += 1
    print('original time was {0:5.2f} sec'.format(time.time() - startTime))

    procs = mp.cpu_count()

    i_tm = [np.zeros([matrix_height, matrix_width]).astype(int)] * procs
    i_nm = [np.zeros([matrix_height, matrix_width]).astype(int)] * procs
    i_lv = [np.zeros([matrix_height, matrix_width]).astype(int)] * procs

    zz = list(zip(rows, y_dat))

    procs_split = np.array_split(cols, procs)
    itup = []
    for x in range(procs):
        itup.append(((list(procs_split[x])), zz, i_tm[x], i_nm[x], i_lv[x], x))

    startTime = time.time()
    with mp.Pool(processes=procs) as pool:

        ret = pool.starmap(func, itup)
        i_total_matrix = ret[0][0]
        i_n_matrix = ret[0][1]
        for x in range(1, procs):
            i_total_matrix = np.add(i_total_matrix, ret[x][0])
            i_n_matrix = np.add(i_n_matrix, ret[x][1])

        colOrder = [0] * procs
        for x in range(procs):
            colOrder[x] = (procs-1) - ret[x][3]

        i_latest_value = ret[colOrder[0]][2]
        for x in range(1, procs):
            np.putmask(i_latest_value, i_latest_value == 0, ret[x][2])

    print('improved time was {0:5.2f} sec'.format(time.time() - startTime))
    comparison = i_total_matrix == o_total_matrix
    if not comparison.all():
        print('ERROR TOTAL MATRIX')
    comparison = i_n_matrix == o_n_matrix
    if not comparison.all():
        print('ERROR N MATRIX')
    comparison = i_latest_value == o_latest_value
    if not comparison.all():
        print('ERROR LATEST VALUE')

经过试用 运行,结果显示大约。快两倍:

原时间为 7.12 秒

改进的时间为 2.29 秒