CUDA /推力中分段数据的矢量化上限
vectorized upper bound for segmented data in CUDA / thrust
我有以下输入数据:
e = 0 0 0 0 0 0 | 1 1 1
t = 1 1 4 4 4 5 | 1 6 7
i = 0 1 2 3 4 5 | 6 7 8 // indices from [0,n-1]
数据首先按 e
排序,然后按 t
排序。 e
是标识数据段的键。在这种情况下:
segment_0 = [0,5]
segment_1 = [6,8]
每个片段再次被 t
分割。在这种情况下:
sub_segment_0_0 = [0,1] // t==1
sub_segment_0_1 = [2,4] // t==4
sub_segment_0_2 = [5,5] // t==5
sub_segment_1_0 = [6,6] // t==1
sub_segment_1_1 = [7,7] // t==6
sub_segment_1_2 = [8,8] // t==7
我想创建以下输出序列:
f = 2 2 5 5 5 6 | 7 8 9
l = 6 6 6 6 6 6 | 9 9 9
f
包含当前段中下一个 sub_segment 的起始索引。
l
包含(当前段中最后一个 sub_segment 的结束索引)+ 1.
对于每个段的最后 sub_segment,两个值都应指向其结束索引。
为了计算 f
,我尝试使用 thrust::upper_bound
,但这仅在我只有一个 sub_segment 时有效:
#include <thrust/host_vector.h>
#include <thrust/copy.h>
#include <thrust/binary_search.h>
#include <thrust/device_vector.h>
#include <stdint.h>
#include <iostream>
#define PRINTER(name) print(#name, (name))
template <template <typename...> class V, typename T, typename ...Args>
void print(const char* name, const V<T,Args...> & v)
{
std::cout << name << ":\t";
thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, "\t"));
std::cout << std::endl;
}
int main()
{
uint32_t e[] = {0,0,0,0,0,0};
uint32_t t[] = {1,1,4,4,4,5};
uint32_t i[] = {0,1,2,3,4,5};
int size = sizeof(i)/sizeof(i[0]);
typedef thrust::host_vector<uint32_t> HVec;
typedef thrust::device_vector<uint32_t> DVec;
HVec h_i(i,i+size);
HVec h_e(e,e+size);
HVec h_t(t,t+size);
DVec d_i = h_i;
DVec d_e = h_e;
DVec d_t = h_t;
PRINTER(d_e);
PRINTER(d_t);
PRINTER(d_i);
DVec upper(size);
thrust::upper_bound(d_t.begin(), d_t.end(), d_t.begin(), d_t.end(), upper.begin());
PRINTER(upper);
return 0;
}
输出:
d_e: 0 0 0 0 0 0
d_t: 1 1 4 4 4 5
d_i: 0 1 2 3 4 5
upper: 2 2 5 5 5 6
如果我使用包含两个 sub_segment 的输入数据,它将不再起作用,因为没有 thrust::upper_bound_by_key
:
// replace in the code above
uint32_t e[] = {0,0,0,0,0,0,1,1,1};
uint32_t t[] = {1,1,4,4,4,5,1,6,7};
uint32_t i[] = {0,1,2,3,4,5,6,7,8};
输出
d_e: 0 0 0 0 0 0 1 1 1
d_t: 1 1 4 4 4 5 1 6 7
d_i: 0 1 2 3 4 5 6 7 8
upper: 2 2 7 7 7 7 2 8 9
如何为我的数据实施 upper_bound_by_key
?
我怎样才能有效地计算 l
?
我对任何解决方案持开放态度,推力不是必需的。
这是一种可能的方法:
标记 (t-) 片段的结尾。我假设一个 e-segment 可能有一个 t-segment。如果是这种情况,那么相邻的 e 段可能具有相同数值(大概为 1)的 t 段。因此标记段的结尾需要同时考虑e
和t
。我使用的方法基本上类似于相邻差异,除了它使用 thrust::transform
和 e
和 t
.[=28= 的移位表示同时考虑 e
和 t
]
确定 f
将为每个段保留的值。现在我们知道每个 (t-) 段的结尾,我们可以简单地从 i
中选择下一个值(使用 copy_if
,并将段结束标记作为我们的模板)作为 f
前一段的值。为了方便这一点,并且由于您的 i
只是一个索引序列,我创建了一个 i
向量,它比您显示的内容长一个元素。
为每个段创建一个数值递增的索引。这只是对步骤 1 中创建的矢量的独占扫描。
使用第 3 步中创建的索引序列,将第 2 步中创建的 f
段值"scatter",放入我们的 f
结果中("scatter" 是用 thrust::copy
和一个置换迭代器完成的)。
这是一个有效的例子,借用了你的代码:
$ cat t835.cu
#include <thrust/host_vector.h>
#include <thrust/copy.h>
#include <thrust/device_vector.h>
#include <stdint.h>
#include <iostream>
#include <thrust/transform.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/iterator/permutation_iterator.h>
#include <thrust/sequence.h>
#include <thrust/scan.h>
using namespace thrust::placeholders;
struct my_semarker_func
{
template <typename T>
__host__ __device__
uint32_t operator()(const T &d1, const T &d2){
if (thrust::get<0>(d1) != thrust::get<0>(d2)) return 1;
if (thrust::get<1>(d1) != thrust::get<1>(d2)) return 1;
return 0;}
};
#define PRINTER(name) print(#name, (name))
template <template <typename...> class V, typename T, typename ...Args>
void print(const char* name, const V<T,Args...> & v)
{
std::cout << name << ":\t";
thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, "\t"));
std::cout << std::endl;
}
int main()
{
uint32_t e[] = {0,0,0,0,0,0,1,1,1};
uint32_t t[] = {1,1,4,4,4,5,1,6,7};
int size = sizeof(t)/sizeof(t[0]);
typedef thrust::host_vector<uint32_t> HVec;
typedef thrust::device_vector<uint32_t> DVec;
HVec h_e(e,e+size);
HVec h_t(t,t+size);
DVec d_i(size+1);
DVec d_e = h_e;
DVec d_t = h_t;
thrust::sequence(d_i.begin(), d_i.end());
PRINTER(d_e);
PRINTER(d_t);
PRINTER(d_i);
// create segment end markers
DVec d_s(size,1);
thrust::transform(thrust::make_zip_iterator(thrust::make_tuple(d_e.begin(), d_t.begin())), thrust::make_zip_iterator(thrust::make_tuple(d_e.end()-1, d_t.end()-1)), thrust::make_zip_iterator(thrust::make_tuple(d_e.begin()+1, d_t.begin()+1)), d_s.begin(), my_semarker_func());
// create segment f values
DVec d_g(size);
thrust::copy_if(d_i.begin()+1, d_i.end(), d_s.begin(), d_g.begin(), _1 == 1);
// create segment indices
DVec d_h(size);
thrust::exclusive_scan(d_s.begin(), d_s.end(), d_h.begin());
// create f
DVec d_f(size);
thrust::copy_n(thrust::make_permutation_iterator(d_g.begin(), d_h.begin()), size, d_f.begin());
PRINTER(d_f);
return 0;
}
$ nvcc -std=c++11 -o t835 t835.cu
$ ./t835
d_e: 0 0 0 0 0 0 1 1 1
d_t: 1 1 4 4 4 5 1 6 7
d_i: 0 1 2 3 4 5 6 7 8 9
d_f: 2 2 5 5 5 6 7 8 9
$
可以使用非常相似的序列来创建 l
向量。
我找到了另一种方法来做到这一点。
为了能够使用 lower_bound
,我需要确保 t
是 全局 排序的。为了做到这一点,我首先找出每个
sub_segment 使用 adjacent_difference
。之后,scatter_if
从 counting_iterator
为子段的每个起点复制递增的数字。最后,inclusive_scan
为每个子段传播相同的值。我将 inclusive_scan
之前的两个步骤组合到自定义仿函数 my_scatter
中以实现更好的内核融合。
现在 upper_bound
应用于这些全局递增的值以计算 f
。
l
可以通过在 e
上应用 upper_bound
来计算。
我不确定我的方法的效率与@RobertCrovella 提出的方法相比如何。
输出:
d_e: 0 0 0 0 0 0 1 1 1
d_t: 1 1 4 4 4 5 1 6 7
d_i: 0 1 2 3 4 5 6 7 8
norm_t: 0 0 2 2 2 7 13 20 28
d_f: 2 2 5 5 5 6 7 8 9
d_l: 6 6 6 6 6 6 9 9 9
#include <thrust/host_vector.h>
#include <thrust/copy.h>
#include <thrust/binary_search.h>
#include <thrust/device_vector.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/adjacent_difference.h>
#include <thrust/functional.h>
#include <stdint.h>
#include <iostream>
#include <thrust/scatter.h>
#include <thrust/scan.h>
#include <thrust/transform.h>
#define PRINTER(name) print(#name, (name))
template <template <typename...> class V, typename T, typename ...Args>
void print(const char* name, const V<T,Args...> & v)
{
std::cout << name << ":\t";
thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, "\t"));
std::cout << std::endl;
}
template <typename IteratorType, typename IndexType = uint32_t>
struct my_scatter : public thrust::unary_function<IndexType,IndexType>
{
my_scatter(IteratorType first) : first(first)
{
}
__host__ __device__
IndexType operator()(const IndexType& i)
{
IndexType result = i;
if (i > static_cast<IndexType>(0) && *(first+i) == *(first+i-static_cast<IndexType>(1)))
{
result = static_cast<IndexType>(0);
}
return result;
}
IteratorType first;
};
template <typename IteratorType>
my_scatter<IteratorType> make_my_scatter(IteratorType first)
{
return my_scatter<IteratorType>(first);
}
int main()
{
uint32_t e[] = {0,0,0,0,0,0,1,1,1};
uint32_t t[] = {1,1,4,4,4,5,1,6,7};
uint32_t i[] = {0,1,2,3,4,5,6,7,8};
int size = sizeof(i)/sizeof(i[0]);
typedef thrust::host_vector<uint32_t> HVec;
typedef thrust::device_vector<uint32_t> DVec;
HVec h_i(i,i+size);
HVec h_e(e,e+size);
HVec h_t(t,t+size);
DVec d_i = h_i;
DVec d_e = h_e;
DVec d_t = h_t;
PRINTER(d_e);
PRINTER(d_t);
PRINTER(d_i);
DVec norm_t(size);
auto my_scatter_op = make_my_scatter(zip(d_e.begin(), d_t.begin()));
auto ti_begin = thrust::make_transform_iterator(thrust::make_counting_iterator(0), my_scatter_op);
auto ti_end = thrust::make_transform_iterator(thrust::make_counting_iterator(size), my_scatter_op);
thrust::inclusive_scan(ti_begin, ti_end, norm_t.begin());
PRINTER(norm_t);
DVec d_f(size);
thrust::upper_bound(norm_t.begin(), norm_t.end(), norm_t.begin(), norm_t.end(), d_f.begin());
PRINTER(d_f);
DVec d_l(size);
thrust::upper_bound(d_e.begin(), d_e.end(), d_e.begin(), d_e.end(), d_l.begin());
PRINTER(d_l);
return 0;
}
我有以下输入数据:
e = 0 0 0 0 0 0 | 1 1 1
t = 1 1 4 4 4 5 | 1 6 7
i = 0 1 2 3 4 5 | 6 7 8 // indices from [0,n-1]
数据首先按 e
排序,然后按 t
排序。 e
是标识数据段的键。在这种情况下:
segment_0 = [0,5]
segment_1 = [6,8]
每个片段再次被 t
分割。在这种情况下:
sub_segment_0_0 = [0,1] // t==1
sub_segment_0_1 = [2,4] // t==4
sub_segment_0_2 = [5,5] // t==5
sub_segment_1_0 = [6,6] // t==1
sub_segment_1_1 = [7,7] // t==6
sub_segment_1_2 = [8,8] // t==7
我想创建以下输出序列:
f = 2 2 5 5 5 6 | 7 8 9
l = 6 6 6 6 6 6 | 9 9 9
f
包含当前段中下一个 sub_segment 的起始索引。
l
包含(当前段中最后一个 sub_segment 的结束索引)+ 1.
对于每个段的最后 sub_segment,两个值都应指向其结束索引。
为了计算 f
,我尝试使用 thrust::upper_bound
,但这仅在我只有一个 sub_segment 时有效:
#include <thrust/host_vector.h>
#include <thrust/copy.h>
#include <thrust/binary_search.h>
#include <thrust/device_vector.h>
#include <stdint.h>
#include <iostream>
#define PRINTER(name) print(#name, (name))
template <template <typename...> class V, typename T, typename ...Args>
void print(const char* name, const V<T,Args...> & v)
{
std::cout << name << ":\t";
thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, "\t"));
std::cout << std::endl;
}
int main()
{
uint32_t e[] = {0,0,0,0,0,0};
uint32_t t[] = {1,1,4,4,4,5};
uint32_t i[] = {0,1,2,3,4,5};
int size = sizeof(i)/sizeof(i[0]);
typedef thrust::host_vector<uint32_t> HVec;
typedef thrust::device_vector<uint32_t> DVec;
HVec h_i(i,i+size);
HVec h_e(e,e+size);
HVec h_t(t,t+size);
DVec d_i = h_i;
DVec d_e = h_e;
DVec d_t = h_t;
PRINTER(d_e);
PRINTER(d_t);
PRINTER(d_i);
DVec upper(size);
thrust::upper_bound(d_t.begin(), d_t.end(), d_t.begin(), d_t.end(), upper.begin());
PRINTER(upper);
return 0;
}
输出:
d_e: 0 0 0 0 0 0
d_t: 1 1 4 4 4 5
d_i: 0 1 2 3 4 5
upper: 2 2 5 5 5 6
如果我使用包含两个 sub_segment 的输入数据,它将不再起作用,因为没有 thrust::upper_bound_by_key
:
// replace in the code above
uint32_t e[] = {0,0,0,0,0,0,1,1,1};
uint32_t t[] = {1,1,4,4,4,5,1,6,7};
uint32_t i[] = {0,1,2,3,4,5,6,7,8};
输出
d_e: 0 0 0 0 0 0 1 1 1
d_t: 1 1 4 4 4 5 1 6 7
d_i: 0 1 2 3 4 5 6 7 8
upper: 2 2 7 7 7 7 2 8 9
如何为我的数据实施 upper_bound_by_key
?
我怎样才能有效地计算 l
?
我对任何解决方案持开放态度,推力不是必需的。
这是一种可能的方法:
标记 (t-) 片段的结尾。我假设一个 e-segment 可能有一个 t-segment。如果是这种情况,那么相邻的 e 段可能具有相同数值(大概为 1)的 t 段。因此标记段的结尾需要同时考虑
e
和t
。我使用的方法基本上类似于相邻差异,除了它使用thrust::transform
和e
和t
.[=28= 的移位表示同时考虑e
和t
]确定
f
将为每个段保留的值。现在我们知道每个 (t-) 段的结尾,我们可以简单地从i
中选择下一个值(使用copy_if
,并将段结束标记作为我们的模板)作为f
前一段的值。为了方便这一点,并且由于您的i
只是一个索引序列,我创建了一个i
向量,它比您显示的内容长一个元素。为每个段创建一个数值递增的索引。这只是对步骤 1 中创建的矢量的独占扫描。
使用第 3 步中创建的索引序列,将第 2 步中创建的
f
段值"scatter",放入我们的f
结果中("scatter" 是用thrust::copy
和一个置换迭代器完成的)。
这是一个有效的例子,借用了你的代码:
$ cat t835.cu
#include <thrust/host_vector.h>
#include <thrust/copy.h>
#include <thrust/device_vector.h>
#include <stdint.h>
#include <iostream>
#include <thrust/transform.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/iterator/permutation_iterator.h>
#include <thrust/sequence.h>
#include <thrust/scan.h>
using namespace thrust::placeholders;
struct my_semarker_func
{
template <typename T>
__host__ __device__
uint32_t operator()(const T &d1, const T &d2){
if (thrust::get<0>(d1) != thrust::get<0>(d2)) return 1;
if (thrust::get<1>(d1) != thrust::get<1>(d2)) return 1;
return 0;}
};
#define PRINTER(name) print(#name, (name))
template <template <typename...> class V, typename T, typename ...Args>
void print(const char* name, const V<T,Args...> & v)
{
std::cout << name << ":\t";
thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, "\t"));
std::cout << std::endl;
}
int main()
{
uint32_t e[] = {0,0,0,0,0,0,1,1,1};
uint32_t t[] = {1,1,4,4,4,5,1,6,7};
int size = sizeof(t)/sizeof(t[0]);
typedef thrust::host_vector<uint32_t> HVec;
typedef thrust::device_vector<uint32_t> DVec;
HVec h_e(e,e+size);
HVec h_t(t,t+size);
DVec d_i(size+1);
DVec d_e = h_e;
DVec d_t = h_t;
thrust::sequence(d_i.begin(), d_i.end());
PRINTER(d_e);
PRINTER(d_t);
PRINTER(d_i);
// create segment end markers
DVec d_s(size,1);
thrust::transform(thrust::make_zip_iterator(thrust::make_tuple(d_e.begin(), d_t.begin())), thrust::make_zip_iterator(thrust::make_tuple(d_e.end()-1, d_t.end()-1)), thrust::make_zip_iterator(thrust::make_tuple(d_e.begin()+1, d_t.begin()+1)), d_s.begin(), my_semarker_func());
// create segment f values
DVec d_g(size);
thrust::copy_if(d_i.begin()+1, d_i.end(), d_s.begin(), d_g.begin(), _1 == 1);
// create segment indices
DVec d_h(size);
thrust::exclusive_scan(d_s.begin(), d_s.end(), d_h.begin());
// create f
DVec d_f(size);
thrust::copy_n(thrust::make_permutation_iterator(d_g.begin(), d_h.begin()), size, d_f.begin());
PRINTER(d_f);
return 0;
}
$ nvcc -std=c++11 -o t835 t835.cu
$ ./t835
d_e: 0 0 0 0 0 0 1 1 1
d_t: 1 1 4 4 4 5 1 6 7
d_i: 0 1 2 3 4 5 6 7 8 9
d_f: 2 2 5 5 5 6 7 8 9
$
可以使用非常相似的序列来创建 l
向量。
我找到了另一种方法来做到这一点。
为了能够使用 lower_bound
,我需要确保 t
是 全局 排序的。为了做到这一点,我首先找出每个
sub_segment 使用 adjacent_difference
。之后,scatter_if
从 counting_iterator
为子段的每个起点复制递增的数字。最后,inclusive_scan
为每个子段传播相同的值。我将 inclusive_scan
之前的两个步骤组合到自定义仿函数 my_scatter
中以实现更好的内核融合。
现在 upper_bound
应用于这些全局递增的值以计算 f
。
l
可以通过在 e
上应用 upper_bound
来计算。
我不确定我的方法的效率与@RobertCrovella 提出的方法相比如何。
输出:
d_e: 0 0 0 0 0 0 1 1 1
d_t: 1 1 4 4 4 5 1 6 7
d_i: 0 1 2 3 4 5 6 7 8
norm_t: 0 0 2 2 2 7 13 20 28
d_f: 2 2 5 5 5 6 7 8 9
d_l: 6 6 6 6 6 6 9 9 9
#include <thrust/host_vector.h>
#include <thrust/copy.h>
#include <thrust/binary_search.h>
#include <thrust/device_vector.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/adjacent_difference.h>
#include <thrust/functional.h>
#include <stdint.h>
#include <iostream>
#include <thrust/scatter.h>
#include <thrust/scan.h>
#include <thrust/transform.h>
#define PRINTER(name) print(#name, (name))
template <template <typename...> class V, typename T, typename ...Args>
void print(const char* name, const V<T,Args...> & v)
{
std::cout << name << ":\t";
thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, "\t"));
std::cout << std::endl;
}
template <typename IteratorType, typename IndexType = uint32_t>
struct my_scatter : public thrust::unary_function<IndexType,IndexType>
{
my_scatter(IteratorType first) : first(first)
{
}
__host__ __device__
IndexType operator()(const IndexType& i)
{
IndexType result = i;
if (i > static_cast<IndexType>(0) && *(first+i) == *(first+i-static_cast<IndexType>(1)))
{
result = static_cast<IndexType>(0);
}
return result;
}
IteratorType first;
};
template <typename IteratorType>
my_scatter<IteratorType> make_my_scatter(IteratorType first)
{
return my_scatter<IteratorType>(first);
}
int main()
{
uint32_t e[] = {0,0,0,0,0,0,1,1,1};
uint32_t t[] = {1,1,4,4,4,5,1,6,7};
uint32_t i[] = {0,1,2,3,4,5,6,7,8};
int size = sizeof(i)/sizeof(i[0]);
typedef thrust::host_vector<uint32_t> HVec;
typedef thrust::device_vector<uint32_t> DVec;
HVec h_i(i,i+size);
HVec h_e(e,e+size);
HVec h_t(t,t+size);
DVec d_i = h_i;
DVec d_e = h_e;
DVec d_t = h_t;
PRINTER(d_e);
PRINTER(d_t);
PRINTER(d_i);
DVec norm_t(size);
auto my_scatter_op = make_my_scatter(zip(d_e.begin(), d_t.begin()));
auto ti_begin = thrust::make_transform_iterator(thrust::make_counting_iterator(0), my_scatter_op);
auto ti_end = thrust::make_transform_iterator(thrust::make_counting_iterator(size), my_scatter_op);
thrust::inclusive_scan(ti_begin, ti_end, norm_t.begin());
PRINTER(norm_t);
DVec d_f(size);
thrust::upper_bound(norm_t.begin(), norm_t.end(), norm_t.begin(), norm_t.end(), d_f.begin());
PRINTER(d_f);
DVec d_l(size);
thrust::upper_bound(d_e.begin(), d_e.end(), d_e.begin(), d_e.end(), d_l.begin());
PRINTER(d_l);
return 0;
}