如何创建指向可与 std::nth_element 和 openMP 一起使用的 RcppEigen 矩阵的指针?
How can I create Pointers to an RcppEigen matrix that I can use with std::nth_element and openMP?
我正在尝试在 Rcpp 中实现一个函数,该函数将矩阵作为输入并计算用户为所述矩阵的行指定的分位数。由于我想使用 openMP,出于线程安全问题,我尝试使用 RcppEigen 来实现。
这看起来有点复杂的一个原因是为了有效地计算分位数,我试图模仿这种方法(finding quartiles,第一个答案),但允许用户输入。所以基本上我创建了一个向量,其索引对应于第一步中的分位数。在第二步中,我尝试访问 for 循环中的相应值。
这是我尝试的代码:
// // -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; indent-tabs-mode: nil; -*-
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::plugins(openmp)]]
#ifdef _OPENMP
#include <omp.h>
#endif
// [[Rcpp::plugins(cpp11)]]
#include <random>
// [[Rcpp::export]]
SEXP summaryParC(const Eigen::MatrixXd x,
const Eigen::VectorXd quantiles,
int nrow, int ncol, const int ncores)
{
const int no_quantiles = quantiles.size();
Eigen::MatrixXd result(nrow, no_quantiles);
// this part is just to give me a vector of indices I need later on in the foor loop
//-----------------------------------------------
Eigen::VectorXi indices(no_quantiles +1);
indices[0] = -1;
for (int k=0; k<no_quantiles; k++){
if (quantiles[k] < 0.5){
indices[k+1] = floor(quantiles[k] * (ncol-1));
} else {
indices[k+1] = ceil(quantiles[k] * (ncol-1));
}
}
//-----------------------------------------------
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
for(int i = 0; i < nrow; i++){
// I am trying to convert it into a vector so I can sort it
Eigen::VectorXd v = (x.row(i));
auto * ptr = v; // this fails
// here I want to use the pointer to access the n-th element of the vector
for(int q=0; q<no_quantiles; q++){ //quantiles
std::nth_element(ptr + indices[q] + 1, ptr + indices[q+1], ptr + ncol);
result(i,q) = *(ptr + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
我想定义自己的指针的原因是 Eigen::VectorXd v 与 v.begin() 完全不同。如果没有 openMP,我会简单地将 x 定义为 NumericMatrix,将 v 定义为 NumericVector,一切正常。使用 openMP 我不能依赖它是线程安全的?
这适用于较小的数据集,但在较大的矩阵上使用时会崩溃:
// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
NumericVector quantiles,
int nrow, int ncol, const int ncores)
{
const int no_quantiles = quantiles.size();
NumericMatrix result(nrow, no_quantiles);
int indices[no_quantiles +1];
//-----------------------------------------------
indices[0] = -1;
for (int k=0; k<no_quantiles; k++){
if (quantiles[k] < 0.5){
indices[k+1] = floor(quantiles[k] * (ncol-1));
} else {
indices[k+1] = ceil(quantiles[k] * (ncol-1));
}
}
//-----------------------------------------------
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
for(int i = 0; i < nrow; i++){
// converting it into a vector so I can sort it
NumericVector v = (x.row(i));
for(int q=0; q<no_quantiles; q++){ //quantiles
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
非常感谢!
更新:
我实施了 Ralf Stubner 的方法。据我所知,指针工作正常。 (不幸的是,当我尝试 运行 时,R 仍然中止会话。正如 Dirk Eddelbuettel 指出的那样,使用指针并不能解决访问 R 内存的问题)。
// [[Rcpp::export]]
SEXP summaryParC(Eigen::MatrixXd x,
const Eigen::VectorXd quantiles,
int nrow, int ncol, const int ncores)
{
const int no_quantiles = quantiles.size();
Eigen::MatrixXd result(nrow, no_quantiles);
Eigen::VectorXi indices(no_quantiles +1);
indices[0] = -1;
for (int k=0; k<no_quantiles; k++){
if (quantiles[k] < 0.5){
indices[k+1] = floor(quantiles[k] * (ncol-1));
} else {
indices[k+1] = ceil(quantiles[k] * (ncol-1));
}
}
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
for(int i = 0; i < nrow; i++){
Eigen::VectorXd v = (x.row(i));
double * B = v.data();
double * E = B + nrow;
for(int q=0; q<no_quantiles; q++){ //quantiles
std::nth_element(B + indices[q] + 1, B + indices[q+1], E);
result(i,q) = *(B + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
第二次更新:这是一个更清晰的潜在问题示例。我知道使用 R 结构对 openMP 有问题,但也许这个例子可以更好地理解根本原因。
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp11)]]
#include <Rcpp.h>
#ifdef _OPENMP
#include <omp.h>
#endif
using namespace Rcpp;
// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
int nrow, int ncol, const int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
// #pragma omp parallel num_threads(ncores)
{
// #pragma omp for
for(int i = 0; i < nrow; i++){
NumericVector v = (x.row(i));
for(int q=0; q < 5; q++){
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
// [[Rcpp::export]]
SEXP summaryParC(NumericMatrix x,
int nrow, int ncol, const int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
#pragma omp parallel num_threads(ncores)
{
#pragma omp for schedule(dynamic)
for(int i = 0; i < nrow; i++){
{
NumericVector v = (x.row(i));
for(int q=0; q<5; q++){
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
}
return Rcpp::wrap(result);
}
// [[Rcpp::export]]
SEXP summaryParCorder(NumericMatrix x,
int nrow, int ncol, const int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
#pragma omp parallel num_threads(ncores)
{
#pragma omp for ordered schedule(dynamic)
for(int i = 0; i < nrow; i++){
#pragma omp ordered
{
NumericVector v = (x.row(i));
for(int q=0; q<5; q++){
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
}
return Rcpp::wrap(result);
}
***** R - code *****
#this works, but summaryParCorder is much slower.
mbm <- microbenchmark::microbenchmark(
summaryC(x = matrix(as.numeric(1:1000000), ncol = 1000),
nrow = 1000, ncol = 1000, ncores = 4),
summaryParCorder(x = matrix(as.numeric(1:1000000), ncol = 1000),
nrow = 1000, ncol = 1000, ncores = 4),
times = 20
)
mbm
# this breaks:
summaryParC(x = matrix(as.numeric(1:1000000), ncol = 1000),
nrow = 1000, ncol = 1000, ncores = 4)
我没有检查与 OpenMP 的兼容性,但是 Eigen::VectorXd::data()
为您提供了所需的指针,如果有问题的矢量不是 const
:
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::export]]
Eigen::VectorXd quantiles(Eigen::VectorXd x, const Eigen::VectorXi& indices) {
Eigen::VectorXd result(indices.size());
std::nth_element(x.data(), x.data() + indices[0], x.data() + x.size());
result(0) = x[indices[0]];
for (int i = 1; i < indices.size(); ++i) {
std::nth_element(x.data() + indices[i - 1] + 1,
x.data() + indices[i],
x.data() + x.size());
result(i) = x[indices[i]];
}
return result;
}
/*** R
set.seed(42)
x <- runif(12)
i <- sort(sample(seq_len(12), 3)) - 1
quantiles(x, i)
*/
这里有一个完整的解决方案,包括 OpenMP:
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
using namespace Rcpp;
// [[Rcpp::export]]
NumericMatrix summaryC(NumericMatrix x, int nrow, int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
for (int i = 0; i < nrow; i++) {
NumericVector v = (x.row(i));
for (int q = 0; q < 5; ++q) {
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
return result;
}
// [[Rcpp::export]]
Eigen::MatrixXd summaryParC(Eigen::MatrixXd x,int nrow, int ncores) {
Eigen::MatrixXd result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
#pragma omp parallel num_threads(ncores)
{
#pragma omp for schedule(dynamic)
for (int i = 0; i < nrow; i++) {
Eigen::VectorXd v = x.row(i);
for (int q = 0; q < 5; ++q) {
std::nth_element(v.data() + indices[q] + 1,
v.data() + indices[q+1],
v.data() + v.size());
result(i,q) = v[indices[q+1]];
}
}
}
return result;
}
/*** R
x <- matrix(as.numeric(1:1000000), ncol = 1000)
microbenchmark::microbenchmark(
summaryC = summaryC(x = x, nrow = 1000, ncores = 4),
summaryParC = summaryParC(x = x, nrow = 1000, ncores = 4),
times = 100)
*/
我从未见过此并行版本崩溃。在我的双核机器上,它比串行代码快大约 44%。
我正在尝试在 Rcpp 中实现一个函数,该函数将矩阵作为输入并计算用户为所述矩阵的行指定的分位数。由于我想使用 openMP,出于线程安全问题,我尝试使用 RcppEigen 来实现。 这看起来有点复杂的一个原因是为了有效地计算分位数,我试图模仿这种方法(finding quartiles,第一个答案),但允许用户输入。所以基本上我创建了一个向量,其索引对应于第一步中的分位数。在第二步中,我尝试访问 for 循环中的相应值。
这是我尝试的代码:
// // -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; indent-tabs-mode: nil; -*-
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::plugins(openmp)]]
#ifdef _OPENMP
#include <omp.h>
#endif
// [[Rcpp::plugins(cpp11)]]
#include <random>
// [[Rcpp::export]]
SEXP summaryParC(const Eigen::MatrixXd x,
const Eigen::VectorXd quantiles,
int nrow, int ncol, const int ncores)
{
const int no_quantiles = quantiles.size();
Eigen::MatrixXd result(nrow, no_quantiles);
// this part is just to give me a vector of indices I need later on in the foor loop
//-----------------------------------------------
Eigen::VectorXi indices(no_quantiles +1);
indices[0] = -1;
for (int k=0; k<no_quantiles; k++){
if (quantiles[k] < 0.5){
indices[k+1] = floor(quantiles[k] * (ncol-1));
} else {
indices[k+1] = ceil(quantiles[k] * (ncol-1));
}
}
//-----------------------------------------------
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
for(int i = 0; i < nrow; i++){
// I am trying to convert it into a vector so I can sort it
Eigen::VectorXd v = (x.row(i));
auto * ptr = v; // this fails
// here I want to use the pointer to access the n-th element of the vector
for(int q=0; q<no_quantiles; q++){ //quantiles
std::nth_element(ptr + indices[q] + 1, ptr + indices[q+1], ptr + ncol);
result(i,q) = *(ptr + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
我想定义自己的指针的原因是 Eigen::VectorXd v 与 v.begin() 完全不同。如果没有 openMP,我会简单地将 x 定义为 NumericMatrix,将 v 定义为 NumericVector,一切正常。使用 openMP 我不能依赖它是线程安全的?
这适用于较小的数据集,但在较大的矩阵上使用时会崩溃:
// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
NumericVector quantiles,
int nrow, int ncol, const int ncores)
{
const int no_quantiles = quantiles.size();
NumericMatrix result(nrow, no_quantiles);
int indices[no_quantiles +1];
//-----------------------------------------------
indices[0] = -1;
for (int k=0; k<no_quantiles; k++){
if (quantiles[k] < 0.5){
indices[k+1] = floor(quantiles[k] * (ncol-1));
} else {
indices[k+1] = ceil(quantiles[k] * (ncol-1));
}
}
//-----------------------------------------------
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
for(int i = 0; i < nrow; i++){
// converting it into a vector so I can sort it
NumericVector v = (x.row(i));
for(int q=0; q<no_quantiles; q++){ //quantiles
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
非常感谢!
更新:
我实施了 Ralf Stubner 的方法。据我所知,指针工作正常。 (不幸的是,当我尝试 运行 时,R 仍然中止会话。正如 Dirk Eddelbuettel 指出的那样,使用指针并不能解决访问 R 内存的问题)。
// [[Rcpp::export]]
SEXP summaryParC(Eigen::MatrixXd x,
const Eigen::VectorXd quantiles,
int nrow, int ncol, const int ncores)
{
const int no_quantiles = quantiles.size();
Eigen::MatrixXd result(nrow, no_quantiles);
Eigen::VectorXi indices(no_quantiles +1);
indices[0] = -1;
for (int k=0; k<no_quantiles; k++){
if (quantiles[k] < 0.5){
indices[k+1] = floor(quantiles[k] * (ncol-1));
} else {
indices[k+1] = ceil(quantiles[k] * (ncol-1));
}
}
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
for(int i = 0; i < nrow; i++){
Eigen::VectorXd v = (x.row(i));
double * B = v.data();
double * E = B + nrow;
for(int q=0; q<no_quantiles; q++){ //quantiles
std::nth_element(B + indices[q] + 1, B + indices[q+1], E);
result(i,q) = *(B + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
第二次更新:这是一个更清晰的潜在问题示例。我知道使用 R 结构对 openMP 有问题,但也许这个例子可以更好地理解根本原因。
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp11)]]
#include <Rcpp.h>
#ifdef _OPENMP
#include <omp.h>
#endif
using namespace Rcpp;
// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
int nrow, int ncol, const int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
// #pragma omp parallel num_threads(ncores)
{
// #pragma omp for
for(int i = 0; i < nrow; i++){
NumericVector v = (x.row(i));
for(int q=0; q < 5; q++){
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
// [[Rcpp::export]]
SEXP summaryParC(NumericMatrix x,
int nrow, int ncol, const int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
#pragma omp parallel num_threads(ncores)
{
#pragma omp for schedule(dynamic)
for(int i = 0; i < nrow; i++){
{
NumericVector v = (x.row(i));
for(int q=0; q<5; q++){
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
}
return Rcpp::wrap(result);
}
// [[Rcpp::export]]
SEXP summaryParCorder(NumericMatrix x,
int nrow, int ncol, const int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
#pragma omp parallel num_threads(ncores)
{
#pragma omp for ordered schedule(dynamic)
for(int i = 0; i < nrow; i++){
#pragma omp ordered
{
NumericVector v = (x.row(i));
for(int q=0; q<5; q++){
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
}
return Rcpp::wrap(result);
}
***** R - code *****
#this works, but summaryParCorder is much slower.
mbm <- microbenchmark::microbenchmark(
summaryC(x = matrix(as.numeric(1:1000000), ncol = 1000),
nrow = 1000, ncol = 1000, ncores = 4),
summaryParCorder(x = matrix(as.numeric(1:1000000), ncol = 1000),
nrow = 1000, ncol = 1000, ncores = 4),
times = 20
)
mbm
# this breaks:
summaryParC(x = matrix(as.numeric(1:1000000), ncol = 1000),
nrow = 1000, ncol = 1000, ncores = 4)
我没有检查与 OpenMP 的兼容性,但是 Eigen::VectorXd::data()
为您提供了所需的指针,如果有问题的矢量不是 const
:
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::export]]
Eigen::VectorXd quantiles(Eigen::VectorXd x, const Eigen::VectorXi& indices) {
Eigen::VectorXd result(indices.size());
std::nth_element(x.data(), x.data() + indices[0], x.data() + x.size());
result(0) = x[indices[0]];
for (int i = 1; i < indices.size(); ++i) {
std::nth_element(x.data() + indices[i - 1] + 1,
x.data() + indices[i],
x.data() + x.size());
result(i) = x[indices[i]];
}
return result;
}
/*** R
set.seed(42)
x <- runif(12)
i <- sort(sample(seq_len(12), 3)) - 1
quantiles(x, i)
*/
这里有一个完整的解决方案,包括 OpenMP:
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
using namespace Rcpp;
// [[Rcpp::export]]
NumericMatrix summaryC(NumericMatrix x, int nrow, int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
for (int i = 0; i < nrow; i++) {
NumericVector v = (x.row(i));
for (int q = 0; q < 5; ++q) {
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
return result;
}
// [[Rcpp::export]]
Eigen::MatrixXd summaryParC(Eigen::MatrixXd x,int nrow, int ncores) {
Eigen::MatrixXd result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
#pragma omp parallel num_threads(ncores)
{
#pragma omp for schedule(dynamic)
for (int i = 0; i < nrow; i++) {
Eigen::VectorXd v = x.row(i);
for (int q = 0; q < 5; ++q) {
std::nth_element(v.data() + indices[q] + 1,
v.data() + indices[q+1],
v.data() + v.size());
result(i,q) = v[indices[q+1]];
}
}
}
return result;
}
/*** R
x <- matrix(as.numeric(1:1000000), ncol = 1000)
microbenchmark::microbenchmark(
summaryC = summaryC(x = x, nrow = 1000, ncores = 4),
summaryParC = summaryParC(x = x, nrow = 1000, ncores = 4),
times = 100)
*/
我从未见过此并行版本崩溃。在我的双核机器上,它比串行代码快大约 44%。