Python:在 GPU 上将循环 numpy 数学函数重写为 运行

Python: rewrite a looping numpy math function to run on GPU

谁能帮我重写这个函数 doTheMath 函数) 来在 GPU 上进行计算?我用了好几天时间试图解决这个问题,但没有结果。我想知道也许有人可以帮我重写这个函数,以任何你认为适合的方式作为日志,因为我最后给出了相同的结果。我尝试使用 numba 中的 @jit 但由于某种原因,它实际上比 运行 像往常一样使用代码要慢得多。由于样本量很大,目标是大大减少执行时间,所以我自然相信 GPU 是最快的方法。

我将稍微解释一下实际发生的情况。真实数据看起来与在下面的代码中创建的样本数据几乎相同,它被分成样本大小,每个样本大约 5.000.000 行或每个文件大约 150MB。总共有大约 600.000.000 行或 20GB 的数据。我必须遍历这些数据,逐个采样,然后在每个样本中逐行采样,取每行的最后 2000(或其他)行和 运行 doTheMath 函数 [=61= 】 结果。然后将该结果保存回硬盘驱动器,在那里我可以用另一个程序用它做一些其他的事情。正如您在下面看到的,我不需要所有行的所有结果,只需要那些大于特定数量的结果。如果我 运行 我的函数现在在 python 中,我每 1.000.000 行得到大约 62 秒。考虑到所有数据以及处理数据的速度,这是一个很长的时间。

我必须提到,我在 data = joblib.load(file) 的帮助下逐个文件将真实数据文件上传到 RAM,因此上传数据不是问题,因为每个文件只需要大约 0.29 秒。上传后,我 运行 下面的整个代码。耗时最长的是 doTheMath 函数。我愿意将我在 Whosebug 上的所有 500 个声望点数作为奖励,奖励那些愿意帮助我在 GPU 上将这个简单代码重写为 运行 的人。我对 GPU 特别感兴趣,我真的很想看看它是如何解决手头的这个问题的。

EDIT/UPDATE 1: 这是一个 link 真实数据的小样本: data_csv.zip 大约 102000 行真实数据 1 和 2000 行真实数据 2a 和数据 2b。在真实样本数据

上使用 minimumLimit = 400

EDIT/UPDATE 2: 对于关注此 post 的人,这里是以下答案的简短摘要。到目前为止,我们对原始解决方案有 4 个答案。 @Divakar 提供的只是对原始代码的调整。在这两个调整中,只有第一个实际上适用于这个问题,第二个是一个很好的调整,但不适用于此处。在其他三个答案中,其中两个是基于 CPU 的解决方案,一个是 tensorflow-GPU 尝试。 Paul Panzer 的 Tensorflow-GPU 似乎很有前途,但当我实际上 运行 它在 GPU 上时它比原来的慢,所以代码仍然需要改进。

另外两个基于 CPU 的解决方案由@PaulPanzer(纯 numpy 解决方案)和@MSeifert(numba 解决方案)提交。与原始代码相比,这两种解决方案都提供了非常好的结果,并且处理数据的速度都非常快。在这两个中,Paul Panzer 提交的那个更快。它在大约 3 秒内处理了大约 1.000.000 行。唯一的问题是 batchSize 较小,这可以通过切换到 MSeifert 提供的 numba 解决方案来解决,甚至可以在下面讨论的所有调整后使用原始代码。

我非常高兴并感谢@PaulPanzer 和@MSeifert 为他们的答案所做的工作。尽管如此,由于这是一个关于基于 GPU 的解决方案的问题,我正在等着看是否有人愿意尝试使用 GPU 版本,看看与当前相比,GPU 上的数据处理速度有多快 CPU解决方案。如果没有其他答案胜过@PaulPanzer 的纯 numpy 解决方案,那么我将接受他的正确答案并获得赏金:)

EDIT/UPDATE 3: @Divakar post 为 GPU 提供了一个解决方案。经过我对真实数据的测试,速度甚至无法与 CPU 对应的解决方案相提并论。 GPU 在大约 1.5 秒内处理大约 5.000.000。这太不可思议了 :) 我对 GPU 解决方案感到非常兴奋,我感谢@Divakar posting 它。我还要感谢@PaulPanzer 和@MSeifert 的 CPU 解决方案 :) 由于 GPU,现在我的研究以惊人的速度继续进行 :)

import pandas as pd
import numpy as np
import time

def doTheMath(tmpData1, data2a, data2b):
    A = tmpData1[:, 0]
    B = tmpData1[:,1]
    C = tmpData1[:,2]
    D = tmpData1[:,3]
    Bmax = B.max()
    Cmin  = C.min()
    dif = (Bmax - Cmin)
    abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

#Declare variables
batchSize = 2000
sampleSize = 5000000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

#Create Random Sample Data
data1 = np.matrix(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #upper limit
data2b = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #lower limit
#approx. half of data2a will be smaller than data2b, but that is only in the sample data because it is randomly generated, NOT the real data. The real data2a is always higher than data2b.


#Loop through the data
t0 = time.time()
for rowNr in  range(data1.shape[0]):
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
    if(tmp_df.shape[0] == batchSize):
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result])
print('Runtime:', time.time() - t0)

#Save data results
resultArray = np.array(resultArray)
print(resultArray[:,1].sum())
resultArray = pd.DataFrame({'index':resultArray[:,0], 'result':resultArray[:,1]})
resultArray.to_csv("Result Array.csv", sep=';')

我正在处理的 PC 规格:

GTX970(4gb) video card; 
i7-4790K CPU 4.00Ghz; 
16GB RAM;
a SSD drive 
running Windows 7; 

作为附带问题,SLI 中的第二个视频卡是否有助于解决此问题?

调整 #1

通常建议在使用 NumPy 数组时对事物进行矢量化。但是对于非常大的阵列,我认为你在那里没有选择。因此,为了提高性能,可以在求和的最后一步进行小的调整。

我们可以替换制作 1s0s 数组并求和的步骤:

np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

with np.count_nonzero 有效计算布尔数组中的 True 值,而不是转换为 1s0s -

np.count_nonzero((abcd <= data2a) & (abcd >= data2b))

运行时测试 -

In [45]: abcd = np.random.randint(11,99,(10000))

In [46]: data2a = np.random.randint(11,99,(10000))

In [47]: data2b = np.random.randint(11,99,(10000))

In [48]: %timeit np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
10000 loops, best of 3: 81.8 µs per loop

In [49]: %timeit np.count_nonzero((abcd <= data2a) & (abcd >= data2b))
10000 loops, best of 3: 28.8 µs per loop

调整 #2

在处理隐式广播的情况时使用 pre-computed 倒数。更多信息 。因此,存储 dif 的倒数并在步骤 :

中使用它
((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ...

样本测试-

In [52]: A = np.random.rand(10000)

In [53]: dif = 0.5

In [54]: %timeit A/dif
10000 loops, best of 3: 25.8 µs per loop

In [55]: %timeit A*(1.0/dif)
100000 loops, best of 3: 7.94 µs per loop

您有四个地方使用 dif 除法。所以,希望这也能带来明显的提升!

这在技术上是 off-topic(不是 GPU),但我相信你会感兴趣。

有一个明显且相当大的节省:

预计算A + B + C + D(不在循环中,在整个数据上:data1.sum(axis=-1)),因为abcd = ((A+B+C+D) - 4Cmin) / (4dif)。这将节省不少操作。

很惊讶之前没有人发现那个 ;-)

编辑:

还有一件事,虽然我怀疑那只是在你的例子中,而不是在你的真实数据中:

目前 data2a 的大约一半将小于 data2b。在这些地方,您在 abcd 上的条件不能同时为真,因此您甚至不需要在那里计算 abcd

编辑:

我在下面使用的另一个调整但忘了提及:如果您计算移动 window 上的最大值(或最小值)。例如,当您向右移动一点时,最大值发生变化的可能性有多大?只有两件事可以改变它:右边的新点更大(大约在 windowlength 时间发生一次,即使它发生了,你立即知道新的最大值)或者旧的最大值下降window 在左边(也大约在 windowlength 时间发生一次)。只有在最后一种情况下,您才需要在整个 window 中搜索新的最大值。

编辑:

忍不住在 tensorflow 中尝试一下。我没有 GPU,所以你必须自己测试它的速度。将 "gpu" for "cpu" 放在标记的行上。

在 cpu 上,它的速度大约是您最初实施的一半(即没有 Divakar 的调整)。请注意,我冒昧地将输入从矩阵更改为普通数组。目前 tensorflow 有点不稳定,因此请确保您拥有正确的版本。我使用 Python3.6 和 tf 0.12.1 如果你今天安装了 pip3 tensorflow-gpu 它 应该 可能工作。

import numpy as np
import time
import tensorflow as tf

# currently the max/min code is sequential
# thus
parallel_iterations = 1
# but you can put this in a separate loop, precompute and then try and run
# the remainder of doTheMathTF with a larger parallel_iterations

# tensorflow is quite capricious about its data types
ddf = tf.float64
ddi = tf.int32

def worker(data1, data2a, data2b):
    ###################################
    # CHANGE cpu to gpu in next line! #
    ###################################
    with tf.device('/cpu:0'):
        g = tf.Graph ()
        with g.as_default():
            ABCD = tf.constant(data1.sum(axis=-1), dtype=ddf)
            B = tf.constant(data1[:, 1], dtype=ddf)
            C = tf.constant(data1[:, 2], dtype=ddf)
            window = tf.constant(len(data2a))
            N = tf.constant(data1.shape[0] - len(data2a) + 1, dtype=ddi)
            data2a = tf.constant(data2a, dtype=ddf)
            data2b = tf.constant(data2b, dtype=ddf)
            def doTheMathTF(i, Bmax, Bmaxind, Cmin, Cminind, out):
                # most of the time we can keep the old max/min
                Bmaxind = tf.cond(Bmaxind<i,
                                  lambda: i + tf.to_int32(
                                      tf.argmax(B[i:i+window], axis=0)),
                                  lambda: tf.cond(Bmax>B[i+window-1], 
                                                  lambda: Bmaxind, 
                                                  lambda: i+window-1))
                Cminind = tf.cond(Cminind<i,
                                  lambda: i + tf.to_int32(
                                      tf.argmin(C[i:i+window], axis=0)),
                                  lambda: tf.cond(Cmin<C[i+window-1],
                                                  lambda: Cminind,
                                                  lambda: i+window-1))
                Bmax = B[Bmaxind]
                Cmin = C[Cminind]
                abcd = (ABCD[i:i+window] - 4 * Cmin) * (1 / (4 * (Bmax-Cmin)))
                out = out.write(i, tf.to_int32(
                    tf.count_nonzero(tf.logical_and(abcd <= data2a,
                                                    abcd >= data2b))))
                return i + 1, Bmax, Bmaxind, Cmin, Cminind, out
            with tf.Session(graph=g) as sess:
                i, Bmaxind, Bmax, Cminind, Cmin, out = tf.while_loop(
                    lambda i, _1, _2, _3, _4, _5: i<N, doTheMathTF,
                    (tf.Variable(0, dtype=ddi), tf.Variable(0.0, dtype=ddf),
                     tf.Variable(-1, dtype=ddi),
                     tf.Variable(0.0, dtype=ddf), tf.Variable(-1, dtype=ddi),
                     tf.TensorArray(ddi, size=N)),
                    shape_invariants=None,
                    parallel_iterations=parallel_iterations,
                    back_prop=False)
                out = out.pack()
                sess.run(tf.initialize_all_variables())
                out, = sess.run((out,))
    return out

#Declare variables
batchSize = 2000
sampleSize = 50000#00
resultArray = []

#Create Sample Data
data1 = np.random.uniform(1, 100, (sampleSize + batchSize, 4))
data2a = np.random.uniform(0, 1, (batchSize,))
data2b = np.random.uniform(0, 1, (batchSize,))

t0 = time.time()
out = worker(data1, data2a, data2b)
print('Runtime (tensorflow):', time.time() - t0)


good_indices, = np.where(out >= 490)
res_tf = np.c_[good_indices, out[good_indices]]

def doTheMath(tmpData1, data2a, data2b):
    A = tmpData1[:, 0]
    B  = tmpData1[:,1]
    C   = tmpData1[:,2]
    D = tmpData1[:,3]
    Bmax = B.max()
    Cmin  = C.min()
    dif = (Bmax - Cmin)
    abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

#Loop through the data
t0 = time.time()
for rowNr in  range(sampleSize+1):
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
    result = doTheMath(tmp_df, data2a, data2b)
    if (result >= 490):
        resultArray.append([rowNr , result])
print('Runtime (original):', time.time() - t0)
print(np.alltrue(np.array(resultArray)==res_tf))

在开始调整目标 (GPU) 或使用其他任何东西(即并行执行)之前,您可能需要考虑如何改进现有代码。您使用了 标签,所以我将使用它来改进代码:首先我们对数组而不是矩阵进行操作:

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

每次调用 doTheMath 时,您都希望返回一个整数,但是您使用了很多数组并创建了很多中间数组:

abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

每一步创建一个中间数组:

  • tmp1 = A-Cmin,
  • tmp2 = tmp1 / dif
  • tmp3 = B - Cmin,
  • tmp4 = tmp3 / dif
  • ...你明白要点了。

但是这是一个reduce函数(数组->整数)所以有很多中间数组是不必要的权重,只计算"fly".

的值
import numba as nb

@nb.njit
def doTheMathNumba(tmpData, data2a, data2b):
    Bmax = np.max(tmpData[:, 1])
    Cmin = np.min(tmpData[:, 2])
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    sum_ = 0
    for i in range(tmpData.shape[0]):
        val = (tmpData[i, 0] + tmpData[i, 1] + tmpData[i, 2] + tmpData[i, 3]) / 4 * idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_

我在这里做了一些其他的事情来避免多次操作:

(((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4
= ((A - Cmin + B - Cmin + C - Cmin + D - Cmin) / dif) / 4
= (A + B + C + D - 4 * Cmin) / (4 * dif)
= (A + B + C + D) / (4 * dif) - (Cmin / dif)

这实际上将我计算机上的执行时间缩短了将近 10 倍:

%timeit doTheMath(tmp_df, data2a, data2b)       # 1000 loops, best of 3: 446 µs per loop
%timeit doTheMathNumba(tmp_df, data2a, data2b)  # 10000 loops, best of 3: 59 µs per loop

当然还有其他的改进,比如使用滚动min/max来计算BmaxCmin,那至少会使得部分计算运行在 O(sampleSize) 而不是 O(samplesize * batchsize)。这也使得重用某些 (A + B + C + D) / (4 * dif) - (Cmin / dif) 计算成为可能,因为如果 CminBmax 不为下一个样本更改,这些值就不会不同。做起来有点复杂,因为比较不同。但绝对有可能!看这里:

import time
import numpy as np
import numba as nb

@nb.njit
def doTheMathNumba(abcd, data2a, data2b, Bmax, Cmin):
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    quarter_idiff = 0.25 * idiff
    sum_ = 0
    for i in range(abcd.shape[0]):
        val = abcd[i] * quarter_idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_

@nb.njit
def doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, resultArray):
    found = 0
    for rowNr in range(data1.shape[0]):
        if(abcd[rowNr:rowNr + batchSize].shape[0] == batchSize):
            result = doTheMathNumba(abcd[rowNr:rowNr + batchSize], 
                                    data2a, data2b, Bmaxs[rowNr], Cmins[rowNr])
            if (result >= minimumLimit):
                resultArray[found, 0] = rowNr
                resultArray[found, 1] = result
                found += 1
    return resultArray[:found]

#Declare variables
batchSize = 2000
sampleSize = 50000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

from scipy import ndimage
t0 = time.time()
abcd = np.sum(data1, axis=1)
Bmaxs = ndimage.maximum_filter1d(data1[:, 1], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))  # correction for even shapes
Cmins = ndimage.minimum_filter1d(data1[:, 2], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))

result = np.zeros((sampleSize, 2), dtype=np.int64)
doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, result)
print('Runtime:', time.time() - t0)

这给了我一个 Runtime: 0.759593152999878(在 numba 编译函数之后!),而你原来的有 Runtime: 24.68975639343262。现在我们快了 30 倍!

根据你的样本量,它仍然需要 Runtime: 60.187848806381226,但这还不错,对吧?

即使我自己没有这样做, 说可以写 "Numba for CUDA GPUs" 而且看起来并不复杂.

下面是一些代码,用于演示只需调整算法即可实现的功能。它是纯粹的 numpy,但在您发布的样本数据上,比原始版本提速大约 35 倍(在我相当普通的机器上,约 2.5 秒内有约 1,000,000 个样本):

>>> result_dict = master('run')
[('load', 0.82578349113464355), ('precomp', 0.028138399124145508), ('max/min', 0.24333405494689941), ('ABCD', 0.015314102172851562), ('main', 1.3356468677520752)]
TOTAL 2.44821691513

使用的调整:

  • A+B+C+D,看我另一个答案
  • 运行 min/max,包括避免使用相同的 Cmin/dif.[=21= 多次计算 (A+B+C+D - 4Cmin)/(4dif) ]

这些或多或少都是例行公事。这留下了与 data2a/b 的比较,这是昂贵的 O(NK),其中 N 是样本数,K 是 window 的大小。 这里可以利用相对 well-behaved 的数据。使用 运行 min/max 可以创建 data2a/b 的变体,可用于一次测试一系列 window 偏移量,如果测试失败,所有这些偏移量都可以立即排除,否则范围平分。

import numpy as np
import time

# global variables; they will hold the precomputed pre-screening filters
preA, preB = {}, {}
CHUNK_SIZES = None

def sliding_argmax(data, K=2000):
    """compute the argmax of data over a sliding window of width K

    returns:
        indices  -- indices into data
        switches -- window offsets at which the maximum changes
                    (strictly speaking: where the index of the maximum changes)
                    excludes 0 but includes maximum offset (len(data)-K+1)

    see last line of compute_pre_screening_filter for a recipe to convert
    this representation to the vector of maxima
    """
    N = len(data)
    last = np.argmax(data[:K])
    indices = [last]
    while indices[-1] <= N - 1:
        ge = np.where(data[last + 1 : last + K + 1] > data[last])[0]
        if len(ge) == 0:
            if last + K >= N:
                break
            last += 1 + np.argmax(data[last + 1 : last + K + 1])
            indices.append(last)
        else:
            last += 1 + ge[0]
            indices.append(last)
    indices = np.array(indices)
    switches = np.where(data[indices[1:]] > data[indices[:-1]],
                        indices[1:] + (1-K), indices[:-1] + 1)
    return indices, np.r_[switches, [len(data)-K+1]]


def compute_pre_screening_filter(bound, n_offs):
    """compute pre-screening filter for point-wise upper bound

    given a K-vector of upper bounds B and K+n_offs-1-vector data
    compute K+n_offs-1-vector filter such that for each index j
    if for any offset 0 <= o < n_offs and index 0 <= i < K such that
    o + i = j, the inequality B_i >= data_j holds then filter_j >= data_j

    therefore the number of data points below filter is an upper bound for
    the maximum number of points below bound in any K-window in data
    """
    pad_l, pad_r = np.min(bound[:n_offs-1]), np.min(bound[1-n_offs:])
    padded = np.r_[pad_l+np.zeros(n_offs-1,), bound, pad_r+np.zeros(n_offs-1,)]
    indices, switches = sliding_argmax(padded, n_offs)
    return padded[indices].repeat(np.diff(np.r_[[0], switches]))


def compute_all_pre_screening_filters(upper, lower, min_chnk=5, dyads=6):
    """compute upper and lower pre-screening filters for data blocks of
    sizes K+n_offs-1 where
    n_offs = min_chnk, 2min_chnk, ..., 2^(dyads-1)min_chnk

    the result is stored in global variables preA and preB
    """
    global CHUNK_SIZES

    CHUNK_SIZES = min_chnk * 2**np.arange(dyads)
    preA[1] = upper
    preB[1] = lower
    for n in CHUNK_SIZES:
        preA[n] = compute_pre_screening_filter(upper, n)
        preB[n] = -compute_pre_screening_filter(-lower, n)


def test_bounds(block, counts, threshold=400):
    """test whether the windows fitting in the data block 'block' fall
    within the bounds using pre-screening for efficient bulk rejection

    array 'counts' will be overwritten with the counts of compliant samples
    note that accurate counts will only be returned for above threshold
    windows, because the analysis of bulk rejected windows is short-circuited

    also note that bulk rejection only works for 'well behaved' data and
    for example not on random numbers
    """
    N = len(counts)
    K = len(preA[1])
    r = N % CHUNK_SIZES[0]
    # chop up N into as large as possible chunks with matching pre computed
    # filters
    # start with small and work upwards
    counts[:r] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
                                   (block[l:l+K] >= preB[1]))
                  for l in range(r)]

    def bisect(block, counts):
        M = len(counts)
        cnts = np.count_nonzero((block <= preA[M]) & (block >= preB[M]))
        if cnts < threshold:
            counts[:] = cnts
            return
        elif M == CHUNK_SIZES[0]:
            counts[:] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
                                          (block[l:l+K] >= preB[1]))
                         for l in range(M)]
        else:
            M //= 2
            bisect(block[:-M], counts[:M])
            bisect(block[M:], counts[M:])

    N = N // CHUNK_SIZES[0]
    for M in CHUNK_SIZES:
        if N % 2:
            bisect(block[r:r+M+K-1], counts[r:r+M])
            r += M
        elif N == 0:
            return
        N //= 2
    else:
        for j in range(2*N):
            bisect(block[r:r+M+K-1], counts[r:r+M])
            r += M


def analyse(data, use_pre_screening=True, min_chnk=5, dyads=6,
            threshold=400):
    samples, upper, lower = data
    N, K = samples.shape[0], upper.shape[0]
    times = [time.time()]
    if use_pre_screening:
        compute_all_pre_screening_filters(upper, lower, min_chnk, dyads)
    times.append(time.time())
    # compute switching points of max and min for running normalisation
    upper_inds, upper_swp = sliding_argmax(samples[:, 1], K)
    lower_inds, lower_swp = sliding_argmax(-samples[:, 2], K)
    times.append(time.time())
    # sum columns
    ABCD = samples.sum(axis=-1)
    times.append(time.time())
    counts = np.empty((N-K+1,), dtype=int)
    # main loop
    # loop variables:
    offs = 0
    u_ind, u_scale, u_swp = 0, samples[upper_inds[0], 1], upper_swp[0]
    l_ind, l_scale, l_swp = 0, samples[lower_inds[0], 2], lower_swp[0]
    while True:
        # check which is switching next, min(C) or max(B)
        if u_swp > l_swp:
            # greedily take the largest block possible such that dif and Cmin
            # do not change
            block = (ABCD[offs:l_swp+K-1] - 4*l_scale) \
                    * (0.25 / (u_scale-l_scale))
            if use_pre_screening:
                test_bounds(block, counts[offs:l_swp], threshold=threshold)
            else:
                counts[offs:l_swp] = [
                    np.count_nonzero((block[l:l+K] <= upper) &
                                     (block[l:l+K] >= lower))
                    for l in range(l_swp - offs)]
            # book keeping
            l_ind += 1
            offs = l_swp
            l_swp = lower_swp[l_ind]
            l_scale = samples[lower_inds[l_ind], 2]
        else:
            block = (ABCD[offs:u_swp+K-1] - 4*l_scale) \
                    * (0.25 / (u_scale-l_scale))
            if use_pre_screening:
                test_bounds(block, counts[offs:u_swp], threshold=threshold)
            else:
                counts[offs:u_swp] = [
                    np.count_nonzero((block[l:l+K] <= upper) &
                                     (block[l:l+K] >= lower))
                    for l in range(u_swp - offs)]
            u_ind += 1
            if u_ind == len(upper_inds):
                assert u_swp == N-K+1
                break
            offs = u_swp
            u_swp = upper_swp[u_ind]
            u_scale = samples[upper_inds[u_ind], 1]
    times.append(time.time())
    return {'counts': counts, 'valid': np.where(counts >= 400)[0],
            'timings': np.diff(times)}


def master(mode='calibrate', data='fake', use_pre_screening=True, nrep=3,
           min_chnk=None, dyads=None):
    t = time.time()
    if data in ('fake', 'load'):
        data1 = np.loadtxt('data1.csv', delimiter=';', skiprows=1,
                           usecols=[1,2,3,4])
        data2a = np.loadtxt('data2a.csv', delimiter=';', skiprows=1,
                            usecols=[1])
        data2b = np.loadtxt('data2b.csv', delimiter=';', skiprows=1,
                            usecols=[1])
        if data == 'fake':
            data1 = np.tile(data1, (10, 1))
        threshold = 400
    elif data == 'random':
        data1 = np.random.random((102000, 4))
        data2b = np.random.random(2000)
        data2a = np.random.random(2000)
        threshold = 490
        if use_pre_screening or mode == 'calibrate':
            print('WARNING: pre-screening not efficient on artificial data')
    else:
        raise ValueError("data mode {} not recognised".format(data))
    data = data1, data2a, data2b
    t_load = time.time() - t
    if mode == 'calibrate':
        min_chnk = (2, 3, 4, 5, 6) if min_chnk is None else min_chnk
        dyads = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) if dyads is None else dyads
        timings = np.zeros((len(min_chnk), len(dyads)))
        print('max bisect  ' + ' '.join([
            '   n.a.' if dy == 0 else '{:7d}'.format(dy) for dy in dyads]),
              end='')
        for i, mc in enumerate(min_chnk):
            print('\nmin chunk {}'.format(mc), end=' ')
            for j, dy in enumerate(dyads):
                for k in range(nrep):
                    if dy == 0: # no pre-screening
                        timings[i, j] += analyse(
                            data, False, mc, dy, threshold)['timings'][3]
                    else:
                        timings[i, j] += analyse(
                            data, True, mc, dy, threshold)['timings'][3]
                timings[i, j] /= nrep
                print('{:7.3f}'.format(timings[i, j]), end=' ', flush=True)
        best_mc, best_dy = np.unravel_index(np.argmin(timings.ravel()),
                                            timings.shape)
        print('\nbest', min_chnk[best_mc], dyads[best_dy])
        return timings, min_chnk[best_mc], dyads[best_dy]
    if mode == 'run':
        min_chnk = 2 if min_chnk is None else min_chnk
        dyads = 5 if dyads is None else dyads
        res = analyse(data, use_pre_screening, min_chnk, dyads, threshold)
        times = np.r_[[t_load], res['timings']]
        print(list(zip(('load', 'precomp', 'max/min', 'ABCD', 'main'), times)))
        print('TOTAL', times.sum())
        return res

简介及解决代码

嗯,你自找的!因此,在此 post 中列出的是 PyCUDA 的实现,它使用轻量级包装器在 Python 环境中扩展了 CUDA 的大部分功能。我们将使用它的 SourceModule 功能,让我们在 Python 环境中编写和编译 CUDA 内核。

进入正题,在涉及的计算中,我们有滑动最大值和最小值,很少有差异和划分和比较。对于最大和最小部分,涉及块最大查找(对于每个滑动 window),我们将使用 reduction-technique 详细讨论 here. This would be done at block level. For the upper level iterations across sliding windows, we would use the grid level indexing into CUDA resources. For more info on this block and grid format, please refer to page-18。 PyCUDA 还支持用于计算缩减的内置函数,如最大和最小,但我们失去了控制,特别是我们打算使用专用内存,如共享内存和常量内存,以在接近最佳水平的情况下利用 GPU。

列出 PyCUDA-NumPy 解决方案代码 -

1] PyCUDA 部分 -

import pycuda.autoinit
import pycuda.driver as drv
import numpy as np
from pycuda.compiler import SourceModule

mod = SourceModule("""
#define TBP 1024 // THREADS_PER_BLOCK

__device__ void get_Bmax_Cmin(float* out, float *d1, float *d2, int L, int offset)
{
    int tid = threadIdx.x;
    int inv = TBP;
    __shared__ float dS[TBP][2];

    dS[tid][0] = d1[tid+offset];  
    dS[tid][1] = d2[tid+offset];         
    __syncthreads();

    if(tid<L-TBP)  
    {
        dS[tid][0] = fmaxf(dS[tid][0] , d1[tid+inv+offset]);
        dS[tid][1] = fminf(dS[tid][1] , d2[tid+inv+offset]);
    }
    __syncthreads();
    inv = inv/2;

    while(inv!=0)   
    {
        if(tid<inv)
        {
            dS[tid][0] = fmaxf(dS[tid][0] , dS[tid+inv][0]);
            dS[tid][1] = fminf(dS[tid][1] , dS[tid+inv][1]);
        }
        __syncthreads();
        inv = inv/2;
    }
    __syncthreads();

    if(tid==0)
    {
        out[0] = dS[0][0];
        out[1] = dS[0][1];
    }   
    __syncthreads();
}

__global__ void main1(float* out, float *d0, float *d1, float *d2, float *d3, float *lowL, float *highL, int *BLOCKLEN)
{
    int L = BLOCKLEN[0];
    int tid = threadIdx.x;
    int iterID = blockIdx.x;
    float Bmax_Cmin[2];
    int inv;
    float Cmin, dif;   
    __shared__ float dS[TBP*2];   

    get_Bmax_Cmin(Bmax_Cmin, d1, d2, L, iterID);  
    Cmin = Bmax_Cmin[1];
    dif = (Bmax_Cmin[0] - Cmin);

    inv = TBP;

    dS[tid] = (d0[tid+iterID] + d1[tid+iterID] + d2[tid+iterID] + d3[tid+iterID] - 4.0*Cmin) / (4.0*dif);
    __syncthreads();

    if(tid<L-TBP)  
        dS[tid+inv] = (d0[tid+inv+iterID] + d1[tid+inv+iterID] + d2[tid+inv+iterID] + d3[tid+inv+iterID] - 4.0*Cmin) / (4.0*dif);                   

     dS[tid] = ((dS[tid] >= lowL[tid]) & (dS[tid] <= highL[tid])) ? 1 : 0;
     __syncthreads();

     if(tid<L-TBP)
         dS[tid] += ((dS[tid+inv] >= lowL[tid+inv]) & (dS[tid+inv] <= highL[tid+inv])) ? 1 : 0;
     __syncthreads();

    inv = inv/2;
    while(inv!=0)   
    {
        if(tid<inv)
            dS[tid] += dS[tid+inv];
        __syncthreads();
        inv = inv/2;
    }

    if(tid==0)
        out[iterID] = dS[0];
    __syncthreads();

}
""")

请注意THREADS_PER_BLOCK, TBP是根据batchSize设置的。这里的经验法则是将 2 的幂值分配给 TBP,该值刚好小于 batchSize。因此,对于 batchSize = 2000,我们需要 TBP 作为 1024.

2] NumPy 部分 -

def gpu_app_v1(A, B, C, D, batchSize, minimumLimit):
    func1 = mod.get_function("main1")
    outlen = len(A)-batchSize+1

    # Set block and grid sizes
    BSZ = (1024,1,1)
    GSZ = (outlen,1)

    dest = np.zeros(outlen).astype(np.float32)
    N = np.int32(batchSize)
    func1(drv.Out(dest), drv.In(A), drv.In(B), drv.In(C), drv.In(D), \
                     drv.In(data2b), drv.In(data2a),\
                     drv.In(N), block=BSZ, grid=GSZ)
    idx = np.flatnonzero(dest >= minimumLimit)
    return idx, dest[idx]

基准测试

我在GTX 960M上测试过。请注意 PyCUDA 期望数组是连续的。因此,我们需要对列进行切片并进行复制。我 expecting/assuming 可以从文件中读取数据,这样数据就可以沿着行而不是列分布。因此,暂时将它们排除在基准测试功能之外。

原始方法-

def org_app(data1, batchSize, minimumLimit):
    resultArray = []
    for rowNr in  range(data1.shape[0]-batchSize+1):
        tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result]) 
    return resultArray

时间和验证 -

In [2]: #Declare variables
   ...: batchSize = 2000
   ...: sampleSize = 50000
   ...: resultArray = []
   ...: minimumLimit = 490 #use 400 on the real sample data
   ...: 
   ...: #Create Random Sample Data
   ...: data1 = np.random.uniform(1, 100000, (sampleSize + batchSize, 4)).astype(np.float32)
   ...: data2b = np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: data2a = data2b + np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: 
   ...: # Make column copies
   ...: A = data1[:,0].copy()
   ...: B = data1[:,1].copy()
   ...: C = data1[:,2].copy()
   ...: D = data1[:,3].copy()
   ...: 
   ...: gpu_out1,gpu_out2 = gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
   ...: cpu_out1,cpu_out2 = np.array(org_app(data1, batchSize, minimumLimit)).T
   ...: print(np.allclose(gpu_out1, cpu_out1))
   ...: print(np.allclose(gpu_out2, cpu_out2))
   ...: 
True
False

因此,CPU 和 GPU 计数之间存在一些差异。让我们调查一下 -

In [7]: idx = np.flatnonzero(~np.isclose(gpu_out2, cpu_out2))

In [8]: idx
Out[8]: array([12776, 15208, 17620, 18326])

In [9]: gpu_out2[idx] - cpu_out2[idx]
Out[9]: array([-1., -1.,  1.,  1.])

有四个 non-matching 计数实例。这些最多关闭 1。经过研究,我发现了一些这方面的信息。基本上,因为我们使用数学内在函数进行最大和最小计算,我认为那些导致浮动 pt 表示中的最后一个二进制位与 CPU 对应的不同。这被称为 ULP 错误,并已详细讨论 here and here

最后,抛开这个问题,让我们来看看最重要的一点,性能 -

In [10]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 2.18 s per loop

In [11]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
10 loops, best of 3: 82.5 ms per loop

In [12]: 2180.0/82.5
Out[12]: 26.424242424242426

让我们尝试使用更大的数据集。使用 sampleSize = 500000,我们得到 -

In [14]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 23.2 s per loop

In [15]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
1 loops, best of 3: 821 ms per loop

In [16]: 23200.0/821
Out[16]: 28.25822168087698

因此,加速比保持在 27.

左右

限制:

1) 我们使用 float32 数字,因为 GPU 最适合这些数字。在性能方面,non-server GPU 上的双精度特别不受欢迎,因为您使用的是这样的 GPU,所以我用 float32 进行了测试。

进一步改进:

1) 我们可以使用更快的 constant memory 来输入 data2adata2b,而不是使用 global memory.