如何在 cython prange 块中使用 gsl 集成?

How to use gsl integration in a cython prange block?

为了使事情更通用,以 gsl_integration_qag 为例(因为它需要一个工作区来管理间隔)。

这是一个特意设计的简单例子(problem.pyx):

from libc.math cimport cos
from cython.parallel import prange
from cython_gsl cimport *

cdef double mean(double[:] arr):
    cdef Py_ssize_t i
    cdef double sum_ = 0.0
    for i in range(arr.shape[0]):
        sum_ += arr[i]
    return sum_ / arr.shape[0]

cdef double integrand(double x, void *params) nogil:
    cdef double a = (<double*>params)[0]
    cdef double b = (<double*>params)[1]
    return a*x + b

def use_gsl_integration_sequential():
    cdef double result, error
    cdef Py_ssize_t i, j
    cdef double result_sum=0.0
    cdef double results[3]
    cdef double a=0.0, b=0.0
    cdef double da = 0.1
    cdef size_t space_size = 1000

    cdef gsl_function gsl_func
    cdef double params[2]
    gsl_func.function = &integrand

    cdef gsl_integration_workspace *ws
    ws= gsl_integration_workspace_alloc(space_size)

    for i in range(3):
        a += da
        result_sum = 0.0 # reset the sum(thank J.J.Hakala for pointing out this)
        for j in range(4): # here may be range(100000) in practice!
            b = a + j
            params[0] = a; params[1] = b
            gsl_func.params = params
            gsl_integration_qag(&gsl_func, 0.0, 1.0, 1e-8, 1e-8, space_size, GSL_INTEG_GAUSS15, ws, &result, &error)
            result_sum += result
        results[i] = result_sum

    gsl_integration_workspace_free(ws)
    # do some other stuff with the 'results', for example:
    return mean(results)


# Now, I want to parallel the inner loop
def use_gsl_integration_parallel():
    cdef double result, error
    cdef Py_ssize_t i, j
    cdef double result_sum=0.0
    cdef double results[3]
    cdef double a, b
    cdef double da = 0.1
    cdef size_t space_size = 1000

    cdef gsl_function gsl_func
    cdef double params[2]
    gsl_func.function = &integrand

    cdef gsl_integration_workspace *ws
    ws= gsl_integration_workspace_alloc(space_size)

    for i in range(3):
        # a should be read-only in the follow prange block.
        a += da 
        # here may be prange(100000,...) in practice!
        for j in prange(4, nogil=True): 
            # b should be local private.
            b = a + j

            # params also should be local private
            params[0] = a; params[1] = b

            # gsl_func is shared, only its params differ.
            # According to the gsl manual(see follow link), workspaces should be allocated on a per-thread basis, but how?
            gsl_func.params = params
            gsl_integration_qag(&gsl_func, 0.0, 1.0, 1e-8, 1e-8, space_size, GSL_INTEG_GAUSS15, ws, &result, &error)

            result_sum += result

        results[i] = result_sum
        gsl_integration_workspace_free(ws)
    return mean(results)

代码有点长,只是为了完整(复制),但我觉得比较简单(阅读)(๑•ᴗ•๑)

然后编译 & link:

cython -a -2 problem.pyx

gcc -O3 -fopenmp -c problem.c -o problem.o -IC:\gsl2.1x86\include -IC:\python-2.7.10\include

gcc -shared -fPIC -fopenmp -o problem.pyd -LC:\gsl2.1x86\libs -LC:\python-2.7.10\libs problem.o -l:libgsl.a -l:libgslcblas.dll.a -l:libpython27.dll.a

在IPython中:

In [1]: from problem import *
In [2]: use_gsl_integration_sequential()
Out[2]: 7.2

use_gsl_integration_parallel()的结果是undefined,我试了几次,最多的时候会崩溃,运气好的话,得到一个undefined值,所以肯定有问题!我就是找不到这样的平行例子。有人会帮助我吗?

环境:

win10-64bit, gcc (tdm-1) 5.1.0 32bit, python-2.7.10 32bit, cython0.24, gsl-2.1

一些有用的参考资料?:

https://github.com/twiecki/CythonGSL

https://www.gnu.org/software/gsl/manual/html_node/Thread_002dsafety.html#Thread_002dsafety

(抱歉,无法 post 更多信息,两个 link 是我的极限...

当我测试该代码时,出现分段错误(linux,gcc 4.9.2)。

我认为原代码存在以下问题

  • 四个线程共享多个变量,每个线程都应该有自己的
  • 相同的工作 space 在这些线程中使用

以下版本给出7.2。顺序版本是否真的正确,因为它没有在

中设置 result_sum = 0.0
for i in range(3):
    a += da
    result_sum = 0.0
from openmp cimport omp_get_max_threads, omp_get_thread_num

def use_gsl_integration_parallel():
    cdef double result
    cdef Py_ssize_t i, j
    cdef double result_sum
    cdef double results[3]
    cdef double results2[4]
    cdef double a
    cdef double b[4]
    cdef double error[4]
    cdef double da = 0.1
    cdef size_t space_size = 1000

    cdef gsl_function gsl_func[4]
    cdef double params[4][2]
    cdef int tid

    cdef gsl_integration_workspace ** ws = <gsl_integration_workspace **>malloc(omp_get_max_threads() * sizeof(gsl_integration_workspace *))

    for j in range(omp_get_max_threads()):
        ws[j] = gsl_integration_workspace_alloc(space_size)

    for i in range(3):
        a += da

        for j in prange(4, nogil=True):
            tid = omp_get_thread_num()
            b[tid] = a + j

            params[tid][0] = a;
            params[tid][1] = b[tid]

            gsl_func[tid].function = &integrand
            gsl_func[tid].params = params[tid]
            gsl_integration_qag(&gsl_func[tid], 0.0, 1.0, 1e-8, 1e-8,
                                space_size, GSL_INTEG_GAUSS15, ws[tid],
                                &results2[j], &error[j])
            # printf("tid %d\n", tid)
        results[i] = sum(results2)

    for j in range(omp_get_max_threads()):
        gsl_integration_workspace_free(ws[j])
    free(ws)

    return mean(results)

感谢@J.J.Hakala,我终于弄清楚了如何让这些东西发挥作用。我改造了示例,并做了一些简单的配置文件。我只是post这里,希望有一天它能对一些科学计算的新手(比如我)有所帮助。

如有错误,请指正。谢谢!

# cython: cdivision=True
# cython: boundscheck=False
# cython: wraparound=False

from libc.stdlib cimport malloc, free
from libc.math cimport sin
from cython.parallel import prange
from cython_gsl cimport *
cimport openmp

cdef double arr_sum(double *arr, size_t arr_len):
    cdef size_t i
    cdef double sum_ = 0.0
    for i in range(arr_len):
        sum_ += arr[i]
    return sum_

cdef double arr_mean(double *arr, size_t arr_len):
    return arr_sum(arr, arr_len) / arr_len

# A random chosen integrand function for demo.
cdef double integrand(double x, void *params) nogil:
    cdef double a = (<double*>params)[0]
    cdef double b = (<double*>params)[1]
    return sin(a*x + b)

def sequential_solution(int outer_loops, int inner_loops):
    cdef double result, error
    cdef size_t i, j
    cdef double result_sum=0.0, mean_val
    cdef double *results = <double*>malloc(sizeof(double) * outer_loops)
    cdef double a=0.0, da = 0.1
    cdef size_t space_size = 1000
    cdef gsl_function gsl_func
    cdef double params[2]

    gsl_func.function = &integrand
    cdef gsl_integration_workspace *ws = gsl_integration_workspace_alloc(space_size)

    for i in range(outer_loops):
        a += da
        result_sum = 0.0
        for j in range(inner_loops):
            params[0] = a; params[1] = a + j
            gsl_func.params = params
            gsl_integration_qag(&gsl_func, 0.0, 1.0, 1e-8, 1e-8, space_size, GSL_INTEG_GAUSS15, ws, &result, &error)
            result_sum += result

        results[i] = result_sum

    mean_val = arr_mean(results, outer_loops)
    gsl_integration_workspace_free(ws)
    free(results)
    return mean_val


def parallel_solution(int outer_loops, int inner_loops, int num_threads):
    cdef size_t i, j
    cdef double a = 0.0
    cdef double da = 0.1
    cdef double mean_val = 0.0
    cdef size_t space_size = 1000

    # Heavy malloc!
    cdef double *outer_results = <double*>malloc(sizeof(double) * outer_loops)
    cdef double *inner_results = <double*>malloc(sizeof(double) * inner_loops)
    #cdef double *error = <double*>malloc(sizeof(double) * inner_loops)
    cdef double error # Ignore the integration abserror in parallel
    cdef gsl_function *gsl_func = <gsl_function*>malloc(sizeof(gsl_function) * inner_loops)
    cdef double *params = <double*>malloc(sizeof(double) * inner_loops * 2)
    cdef gsl_integration_workspace **ws = <gsl_integration_workspace**>malloc(sizeof(gsl_integration_workspace*) * num_threads)
    for i in range(num_threads):
        ws[i] = gsl_integration_workspace_alloc(space_size)

    for i in range(outer_loops):
        a += da
        for j in prange(inner_loops, nogil=True, num_threads=num_threads):
            error = 0 # local private.
            params[2*j] = a; params[2*j+1] = a+j
            gsl_func[j].function = &integrand
            gsl_func[j].params = params + 2*j

            # ws[openmp.omp_get_thread_num()] guarantees each thread shares it's own workspace,
            # since workspace is resuable in a thread. If two thread access the same workspace, data will corrupt.
            gsl_integration_qag(&gsl_func[j], 0.0, 1.0, 1e-8, 1e-8,
                                space_size, GSL_INTEG_GAUSS15, ws[openmp.omp_get_thread_num()],
                                &inner_results[j], &error)

        outer_results[i] = arr_sum(inner_results, inner_loops)

    mean_val = arr_mean(outer_results, outer_loops)

    # Now, free
    for i in range(num_threads):
        gsl_integration_workspace_free(ws[i])
    free(ws)
    free(params)
    free(gsl_func)
    #free(error)
    free(inner_results)
    free(outer_results)
    return mean_val

def parallel_solution_ver(int outer_loops, int inner_loops, int num_threads):
    cdef size_t i, j
    cdef double a = 0.0
    cdef double da = 0.1
    cdef double mean_val = 0.0
    cdef size_t space_size = 1000

    cdef double *outer_results = <double*>malloc(sizeof(double) * outer_loops)
    #cdef double *inner_results = <double*>malloc(sizeof(double) * inner_loops) 
    cdef double inner_result, inner_result_sum # no need to malloc a inner_results 
    #cdef double *error = <double*>malloc(sizeof(double) * inner_loops)
    cdef double error  # Ignore the integration abserror in parallel
    cdef gsl_function *gsl_func = <gsl_function*>malloc(sizeof(gsl_function) * inner_loops)
    cdef double *params = <double*>malloc(sizeof(double) * inner_loops * 2)
    cdef gsl_integration_workspace **ws = <gsl_integration_workspace**>malloc(sizeof(gsl_integration_workspace*) * num_threads)
    for i in range(num_threads):
        ws[i] = gsl_integration_workspace_alloc(space_size)

    for i in range(outer_loops):
        a += da
        inner_result_sum = 0.0  # reduction(+: inner_result_sum)
        for j in prange(inner_loops, nogil=True, num_threads=num_threads):
            inner_result = 0.0  # local private.
            error = 0  # local private.
            params[2*j] = a; params[2*j+1] = a+j
            gsl_func[j].function = &integrand
            gsl_func[j].params = params + 2*j

            # ws[openmp.omp_get_thread_num()] guarantees each thread shares it's own workspace,
            # since workspace is resuable in a thread. If two thread access the same workspace, data will corrupt.
            gsl_integration_qag(&gsl_func[j], 0.0, 1.0, 1e-8, 1e-8,
                                space_size, GSL_INTEG_GAUSS15, ws[openmp.omp_get_thread_num()],
                                &inner_result, &error)
            inner_result_sum += inner_result

        outer_results[i] = inner_result_sum

    mean_val = arr_mean(outer_results, outer_loops)

    # Now, free
    for i in range(num_threads):
        gsl_integration_workspace_free(ws[i])
    free(ws)
    free(params)
    free(gsl_func)
    #free(error)
    #free(inner_results)
    free(outer_results)
    return mean_val

更新:不需要 malloc 一个 results 数组,添加了一个更新的解决方案 parallel_solution_ver 我认为它更好,尽管它的效率几乎与 parallel_solution

在 IPython 笔记本中:

%matplotlib inline
from problem import *
import matplotlib.pyplot as plt

max_exp = 20

times_seq = []
for loops in [2**n for n in range(max_exp)]:
    res = %timeit -n 1 -r 5 -o sequential_solution(10, loops)
    times_seq.append((loops, res))

times_p = []
for loops in [2**n for n in range(max_exp)]:
    res = %timeit -n 1 -r 5 -o parallel_solution(10, loops, 8)
    times_p.append((loops, res))

times_p_ver = []
for loops in [2**n for n in range(max_exp)]:
    res = %timeit -n 1 -r 5 -o parallel_solution_ver(10, loops, 8)
    times_p_ver.append((loops, res))

time_results = [times_seq, times_p, times_p_ver]
labels = ["sequential solution", "parallel solution", "parallel solution_ver"]
colors = ['r', 'y', 'g']
markers = ['s', '+', 'o']
line_params = [
    {"sequential solution": {"color": "r", "marker": "s", "ms": 6}},
    {"parallel solution": {"color": "g", "marker": "+", "ms": 20, "mew": 1}},
    {"parallel solution_ver": {"color": "b", "marker": "o", "ms": 6}}
]

for results, param in zip(time_results, line_params):
    loops = [res[0] for res in results]
    times = [res[1].best for res in results]
    plt.loglog(loops, times, label=param.keys()[0], **param.values()[0])
plt.xlabel("inner loops")
plt.ylabel("exec time (ms)")
plt.legend(loc=0)

print "loops        sequential  parallel    parallel_ver"
for s, p, p_v in zip(times_seq, times_p, times_p):
    print "n = {:6d}   {:f}s   {:f}s   {:f}s".format(s[0], s[1].best, p[1].best, p_v[1].best)

# Same result? Yes!
print sequential_solution(10, 2**19)  # 0.0616505777102
print parallel_solution(10, 2**19, 8)  # 0.0616505777102
print parallel_solution_ver(10, 2**19, 8)  # 0.0616505777102

loops        sequential  parallel    parallel_ver
n =      1   0.000009s   0.001450s   0.001450s  
n =      2   0.000016s   0.001435s   0.001435s  
n =      4   0.000032s   0.001447s   0.001447s  
n =      8   0.000063s   0.001454s   0.001454s  
n =     16   0.000125s   0.001396s   0.001396s  
n =     32   0.000251s   0.001389s   0.001389s  
n =     64   0.000505s   0.001387s   0.001387s  
n =    128   0.001016s   0.001360s   0.001360s  
n =    256   0.002039s   0.001683s   0.001683s  
n =    512   0.003846s   0.001844s   0.001844s  
n =   1024   0.007717s   0.002416s   0.002416s  
n =   2048   0.015395s   0.003575s   0.003575s  
n =   4096   0.030817s   0.006055s   0.006055s  
n =   8192   0.061689s   0.010820s   0.010820s  
n =  16384   0.123445s   0.020872s   0.020872s  
n =  32768   0.246928s   0.040165s   0.040165s  
n =  65536   0.493903s   0.079077s   0.079077s  
n = 131072   0.987832s   0.156176s   0.156176s  
n = 262144   1.975107s   0.311151s   0.311151s  
n = 524288   3.949421s   0.617181s   0.617181s