C++ 数值之谜
C++ numerical mystery
我写了一个 class 将一些任务分成两个子任务。
例如将两个相同大小的向量相加。
我把它设计成一个二进制函数,我的意思是输入指针之一也是输出。
我还假设输入和输出的大小相同。
然后我写了一个稍微修改的应用程序,以管理输入和输出大小不同的情况。
例如向量元素的水平相加。
自从这次修改后,发生了一些奇怪的事情。
我将它应用于 unsigned short 的过程直方图。
直方图存储在大小为 65536 的 std::vector<std::uint64_t>
中。
我想将两个直方图合并为一个,清除另一个。
在我的 class 将输入拆分为两个输出元素的那一刻,神秘的事情发生了。
65536 / 2 = 1099511660544.
像每个人一样,我怀疑可以复制到另一种类型的范围饱和。
但为了管理尺寸,我在任何地方都使用 std::size_t
。
我在 64 位机器上工作,所以我认为这不是饱和问题。
任何帮助都会有所帮助。
提前致谢。
---- 编辑----
请允许我举一个实际的例子来说明我的问题:
worker.hpp :
#ifndef WORKER_HPP
#define WORKER_HPP
#include <tbb/task.h>
#include <type_traits>
#ifndef TBB_ASSERT
#ifdef _DEBUG
#include <cassert>
#define TBB_ASSERT(x) std::assert(x)
#else
#define TBB_ASSERT(x)
#endif
#endif
#define OUTPUT_SIZE_EQUAL_INPUT_SIZE std::size_t(0x80000000)
namespace separable_calculation
{
template<class _Sty,class _Dty = _Sty>
struct basic_operator
{
typedef const _Sty* const_src_pointer;
typedef _Dty* dst_pointer;
typedef const_src_pointer const_pointer;
typedef dst_pointer pointer;
basic_operator() = default;
basic_operator(const basic_operator&) = delete;
basic_operator(basic_operator&&) = delete;
virtual ~basic_operator() = default;
basic_operator& operator=(const basic_operator&) = delete;
basic_operator& operator=(basic_operator&&) = delete;
virtual void operator()(const_pointer,const std::size_t&,pointer/*,const std::size_t&*/) = 0;
};
template<class _Ty,template<class>class _Op>
class worker : public tbb::task
{
// static_assert(std::is_function<_Op>::value || std::is_base_of<basic_operator<_Ty>*,&_Op>::value,"The operator functor must be a derivation of the struct basic_operator");
static_assert(!std::is_abstract<_Op<_Ty> >::value,"The operator struct / class is abstract");
public:
typedef _Ty* pointer;
typedef const _Ty* const_pointer;
typedef std::size_t size_type;
private:
typedef worker self;
enum
{
MUST_BE_PROCESSED = 0x2,
HAS_BEEN_PROCESSED = 0x4,
FIRST_HALF_MUTEX = 0x8,
SECOND_HALF_MUTEX = 0x10
};
const_pointer _src;
size_type _sz;
pointer _dst;
size_type _dsz;
unsigned char _status;
inline worker get_first_half()
{
std::size_t chk(2);
return worker(this->_src,this->_sz/chk,this->_dst,this->_dsz/chk);
}
inline worker get_second_half()
{
const std::size_t half_src = this->_sz/2;
const std::size_t half_dst = this->_sz/2;
return worker(this->_src+half_src,half_src,this->_dst+half_dst,half_dst);
}
// helpfull for the method run.
worker(worker&& obj);
// usefull for prepare the two sub-tasks.
worker(const worker& obj,int);
public:
worker(const_pointer src,const size_type& sz,pointer dst,const size_type& dsz = OUTPUT_SIZE_EQUAL_INPUT_SIZE);
inline worker():
worker(nullptr,0,nullptr,0)
{}
worker(const worker&) = delete;
~worker() = default;
worker& operator=(const worker&) = delete;
worker& operator=(worker&&) = delete;
virtual tbb::task* execute();
// Execute the tasks.
void run();
// Update the source and | or the destination pointers.
void update(const_pointer src,const std::size_t& sz,pointer dst=nullptr,const std::size_t& dsz = OUTPUT_SIZE_EQUAL_INPUT_SIZE);
};
}
#endif // WORKER_H
worker.tcc :
#ifndef WORKER_TCC
#define WORKER_TCC
#include <iostream>
#define DEBUG(str) std::cout<<str<<" "<<this->_sz<<" "<<this->_dsz<<std::endl;
namespace separable_calculation
{
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(worker&& obj):
_src(obj._src),
_sz(obj._sz),
_dst(obj._dst),
_dsz(obj._dsz),
_status(obj._status)
{
DEBUG("move ctor")
}
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(const worker& obj,int):
_src(obj._src),
_sz(obj._sz),
_dst(obj._dst),
_dsz(obj._dsz),
_status(HAS_BEEN_PROCESSED)
{
DEBUG("copy ctor")
}
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(const_pointer src,const size_type& sz,pointer dst,const size_type& dsz):
_src(src),
_sz(sz),
_dst(dst),
_dsz(dsz == OUTPUT_SIZE_EQUAL_INPUT_SIZE ? sz : dsz),
_status(MUST_BE_PROCESSED)
{
DEBUG("param ctor")
}
template<class _Ty,template<class>class _Op>
tbb::task* worker<_Ty,_Op>::execute()
{
tbb::task* ret(nullptr);
// prepare the two sub-tasks
if((this->_status & MUST_BE_PROCESSED) == MUST_BE_PROCESSED)
{
tbb::task* a = new (this->allocate_child()) self(this->get_first_half(),int());
tbb::task* b = new (this->allocate_child()) self(this->get_second_half(),int());
this->set_ref_count(3);
this->spawn(*a);
this->spawn_and_wait_for_all(*b);
}
else
{
_Op<_Ty> op;
std::cout<<"check "<<this->_sz<<" "<<this->_dsz<<std::endl;
op(this->_src,this->_sz,this->_dst/*,this->_dsz*/);
}
return ret;
}
template<class _Ty,template<class>class _Op>
void worker<_Ty,_Op>::run()
{
TBB_ASSERT(this->_dst && this->_src);
self& s = *new(tbb::task::allocate_root()) self(std::move(*this));
tbb::task::spawn_root_and_wait(s);
}
template<class _Ty,template<class>class _Op>
void worker<_Ty,_Op>::update(const_pointer src,const std::size_t& sz,pointer dst,const std::size_t& dsz)
{
std::cout<<"update"<<std::endl;
if(src)
{
this->_src = src;
this->_sz = sz;
}
if(dst)
this->_dst = dst;
if(dsz != OUTPUT_SIZE_EQUAL_INPUT_SIZE)
this->_dsz = (dsz != OUTPUT_SIZE_EQUAL_INPUT_SIZE) ? dsz : this->_sz;
this->_status = MUST_BE_PROCESSED;
}
}
#endif // WORKER_TCC
main.cpp :
#include "worker.hpp"
#include "worker.tcc"
#include <cstdlib>
namespace
{
template<class _Ty>
struct add_t : separable_calculation::basic_operator<_Ty>
{
typedef separable_calculation::basic_operator<_Ty> MyBase;
typedef typename MyBase::const_pointer const_pointer;
typedef typename MyBase::pointer pointer;
typedef typename MyBase::const_src_pointer const_src_pointer;
typedef typename MyBase::dst_pointer dst_pointer;
add_t() = default;
virtual ~add_t() = default;
virtual void operator()(const_pointer src,const std::size_t& sz,pointer dst/*,const std::size_t& dsz*/)
{
pointer it_d = dst;
for(const_pointer it_s = src;it_s != (src+sz); it_s++,it_d++)
*it_d += *it_s;
}
};
}
int main()
{
std::vector<std::uint64_t> a(65536,1);
std::vector<std::uint64_t> b(a);
separable_calculation::worker<std::uint64_t,
add_t> calc(a.data(),a.size(),b.data(),b.size());
calc.run();
return EXIT_SUCCESS;
}
我对这个例子的输出是:
param ctor 65536 65536
move ctor 65536 65536
param ctor 1099511660544 32768
copy ctor 1099511660544 32768
param ctor 1099511660544 32768
copy ctor 1099511660544 32768
check 1099511660544 32768
然后就崩溃了。
现在,如果你按原样中和 class worker 上的 _dsz :
假设 worker2.hpp :
#ifndef WORKER2_HPP
#define WORKER2_HPP
#include <tbb/task.h>
#include <type_traits>
#ifndef TBB_ASSERT
#ifdef _DEBUG
#include <cassert>
#define TBB_ASSERT(x) std::assert(x)
#else
#define TBB_ASSERT(x)
#endif
#endif
namespace separable_calculation2
{
template<class _Ty,template<class>class _Op>
class worker : public tbb::task
{
// static_assert(std::is_function<_Op>::value || std::is_base_of<basic_operator<_Ty>*,&_Op>::value,"The operator functor must be a derivation of the struct basic_operator");
static_assert(!std::is_abstract<_Op<_Ty> >::value,"The operator struct / class is abstract");
public:
typedef _Ty* pointer;
typedef const _Ty* const_pointer;
typedef std::size_t size_type;
private:
typedef worker self;
enum
{
MUST_BE_PROCESSED = 0x2,
HAS_BEEN_PROCESSED = 0x4,
FIRST_HALF_MUTEX = 0x8,
SECOND_HALF_MUTEX = 0x10
};
const_pointer _src;
size_type _sz;
pointer _dst;
// size_type _dsz;
unsigned char _status;
inline worker get_first_half()
{
std::size_t chk(2);
return worker(this->_src,this->_sz/chk,this->_dst/*,this->_dsz/chk*/);
}
inline worker get_second_half()
{
const std::size_t half_src = this->_sz/2;
const std::size_t half_dst = this->_sz/2;
return worker(this->_src+half_src,half_src,this->_dst+half_dst/*,half_dst*/);
}
// helpfull for the method run.
worker(worker&& obj);
// usefull for prepare the two sub-tasks.
worker(const worker& obj,int);
public:
worker(const_pointer src,const size_type& sz,pointer dst/*,const size_type& dsz = OUTPUT_SIZE_EQUAL_INPUT_SIZE*/);
inline worker():
worker(nullptr,0,nullptr,0)
{}
worker(const worker&) = delete;
~worker() = default;
worker& operator=(const worker&) = delete;
worker& operator=(worker&&) = delete;
virtual tbb::task* execute();
// Execute the tasks.
void run();
// Update the source and | or the destination pointers.
void update(const_pointer src,const std::size_t& sz,pointer dst=nullptr/*,const std::size_t& dsz = OUTPUT_SIZE_EQUAL_INPUT_SIZE*/);
};
}
#endif // WORKER2_H
worker2.tcc:
#ifndef WORKER2_TCC
#define WORKER2_TCC
#include <iostream>
#define DEBUG(str) std::cout<<str<<" "<<this->_sz<<" "<<this->_dsz<<std::endl;
namespace separable_calculation2
{
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(worker&& obj):
_src(obj._src),
_sz(obj._sz),
_dst(obj._dst),
// _dsz(obj._dsz),
_status(obj._status)
{
// DEBUG("move ctor")
}
templa
te<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(const worker& obj,int):
_src(obj._src),
_sz(obj._sz),
_dst(obj._dst),
// _dsz(obj._dsz),
_status(HAS_BEEN_PROCESSED)
{
// DEBUG("copy ctor")
}
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(const_pointer src,const size_type& sz,pointer dst/*,const size_type& dsz*/):
_src(src),
_sz(sz),
_dst(dst),
// _dsz(dsz == OUTPUT_SIZE_EQUAL_INPUT_SIZE ? sz : dsz),
_status(MUST_BE_PROCESSED)
{
// DEBUG("param ctor")
}
template<class _Ty,template<class>class _Op>
tbb::task* worker<_Ty,_Op>::execute()
{
tbb::task* ret(nullptr);
// prepare the two sub-tasks
if((this->_status & MUST_BE_PROCESSED) == MUST_BE_PROCESSED)
{
tbb::task* a = new (this->allocate_child()) self(this->get_first_half(),int());
tbb::task* b = new (this->allocate_child()) self(this->get_second_half(),int());
this->set_ref_count(3);
this->spawn(*a);
this->spawn_and_wait_for_all(*b);
}
else
{
_Op<_Ty> op;
// std::cout<<"check "<<this->_sz<<" "<<this->_dsz<<std::endl;
op(this->_src,this->_sz,this->_dst/*,this->_dsz*/);
}
return ret;
}
template<class _Ty,template<class>class _Op>
void worker<_Ty,_Op>::run()
{
TBB_ASSERT(this->_dst && this->_src);
self& s = *new(tbb::task::allocate_root()) self(std::move(*this));
tbb::task::spawn_root_and_wait(s);
}
template<class _Ty,template<class>class _Op>
void worker<_Ty,_Op>::update(const_pointer src,const size_type& sz,pointer dst/*,const std::size_t& dsz*/)
{
std::cout<<"update"<<std::endl;
if(src)
{
this->_src = src;
this->_sz = sz;
}
if(dst)
this->_dst = dst;
// if(dsz != OUTPUT_SIZE_EQUAL_INPUT_SIZE)
// this->_dsz = (dsz != OUTPUT_SIZE_EQUAL_INPUT_SIZE) ? dsz : this->_sz;
this->_status = MUST_BE_PROCESSED;
}
}
#endif // WORKER2_TCC
现在在 main.cpp 添加:
#include "worker2.hpp"
#include "worker2.tcc"
并在函数 main 中添加中和之前的 calc 对象并写入:
separable_calculation2::worker<std::uint64_t,add_t> calc(a.data(),a.size(),b.data());
calc.run();
std::cout<<"END PROCESSING"<<std::endl;
我修好了。
所以问题出在方法get_first_half()
和get_second_half()
上。
这些函数对应的代码是这个:
inline worker get_first_half()
{
std::size_t chk(2);
return worker(this->_src,this->_sz/chk,this->_dst,this->_dsz/chk);
}
inline worker get_second_half()
{
const std::size_t half_src = this->_sz/2;
const std::size_t half_dst = this->_sz/2;
return worker(this->_src+half_src,half_src,this->_dst+half_dst,half_dst);
}
出于某种原因,我仍然想知道临时对象之前的临时对象 65536/2 == 32768 里面的临时对象仅用于第二个参数 65536/2 == 1099511660544。
如果创建一个对象并且 return 它可以工作。
即
inline worker get_first_half()
{
worker tmp(std::move(*this));
tmp._sz/=2;
tmp._dsz/=2;
return tmp;
}
inline worker get_second_half()
{
const std::size_t half_src = this->_sz/2;
const std::size_t half_dst = this->_sz/2;
worker tmp(std::move(*this));
tmp._sz/=2;
tmp._dsz/=2;
tmp._src+=tmp._sz;
tmp._dst+=tmp._dsz;
return tmp;
}
如果有人知道第一个实现有什么问题,我仍然很想了解这个问题。
否则就解决了。
这不是答案,只是对您的代码的注释。
我发现您使用低级任务 API(有高级 task_group
和 parallel_invoke
)但仍然采用阻塞方式,因此效率低下:
this->set_ref_count(3);
this->spawn(*a);
this->spawn_and_wait_for_all(*b);
它的效率低于连续传递样式。可读性不如高级 API
我写了一个 class 将一些任务分成两个子任务。
例如将两个相同大小的向量相加。
我把它设计成一个二进制函数,我的意思是输入指针之一也是输出。
我还假设输入和输出的大小相同。
然后我写了一个稍微修改的应用程序,以管理输入和输出大小不同的情况。
例如向量元素的水平相加。
自从这次修改后,发生了一些奇怪的事情。
我将它应用于 unsigned short 的过程直方图。
直方图存储在大小为 65536 的 std::vector<std::uint64_t>
中。
我想将两个直方图合并为一个,清除另一个。
在我的 class 将输入拆分为两个输出元素的那一刻,神秘的事情发生了。
65536 / 2 = 1099511660544.
像每个人一样,我怀疑可以复制到另一种类型的范围饱和。
但为了管理尺寸,我在任何地方都使用 std::size_t
。
我在 64 位机器上工作,所以我认为这不是饱和问题。
任何帮助都会有所帮助。
提前致谢。
---- 编辑----
请允许我举一个实际的例子来说明我的问题:
worker.hpp :
#ifndef WORKER_HPP
#define WORKER_HPP
#include <tbb/task.h>
#include <type_traits>
#ifndef TBB_ASSERT
#ifdef _DEBUG
#include <cassert>
#define TBB_ASSERT(x) std::assert(x)
#else
#define TBB_ASSERT(x)
#endif
#endif
#define OUTPUT_SIZE_EQUAL_INPUT_SIZE std::size_t(0x80000000)
namespace separable_calculation
{
template<class _Sty,class _Dty = _Sty>
struct basic_operator
{
typedef const _Sty* const_src_pointer;
typedef _Dty* dst_pointer;
typedef const_src_pointer const_pointer;
typedef dst_pointer pointer;
basic_operator() = default;
basic_operator(const basic_operator&) = delete;
basic_operator(basic_operator&&) = delete;
virtual ~basic_operator() = default;
basic_operator& operator=(const basic_operator&) = delete;
basic_operator& operator=(basic_operator&&) = delete;
virtual void operator()(const_pointer,const std::size_t&,pointer/*,const std::size_t&*/) = 0;
};
template<class _Ty,template<class>class _Op>
class worker : public tbb::task
{
// static_assert(std::is_function<_Op>::value || std::is_base_of<basic_operator<_Ty>*,&_Op>::value,"The operator functor must be a derivation of the struct basic_operator");
static_assert(!std::is_abstract<_Op<_Ty> >::value,"The operator struct / class is abstract");
public:
typedef _Ty* pointer;
typedef const _Ty* const_pointer;
typedef std::size_t size_type;
private:
typedef worker self;
enum
{
MUST_BE_PROCESSED = 0x2,
HAS_BEEN_PROCESSED = 0x4,
FIRST_HALF_MUTEX = 0x8,
SECOND_HALF_MUTEX = 0x10
};
const_pointer _src;
size_type _sz;
pointer _dst;
size_type _dsz;
unsigned char _status;
inline worker get_first_half()
{
std::size_t chk(2);
return worker(this->_src,this->_sz/chk,this->_dst,this->_dsz/chk);
}
inline worker get_second_half()
{
const std::size_t half_src = this->_sz/2;
const std::size_t half_dst = this->_sz/2;
return worker(this->_src+half_src,half_src,this->_dst+half_dst,half_dst);
}
// helpfull for the method run.
worker(worker&& obj);
// usefull for prepare the two sub-tasks.
worker(const worker& obj,int);
public:
worker(const_pointer src,const size_type& sz,pointer dst,const size_type& dsz = OUTPUT_SIZE_EQUAL_INPUT_SIZE);
inline worker():
worker(nullptr,0,nullptr,0)
{}
worker(const worker&) = delete;
~worker() = default;
worker& operator=(const worker&) = delete;
worker& operator=(worker&&) = delete;
virtual tbb::task* execute();
// Execute the tasks.
void run();
// Update the source and | or the destination pointers.
void update(const_pointer src,const std::size_t& sz,pointer dst=nullptr,const std::size_t& dsz = OUTPUT_SIZE_EQUAL_INPUT_SIZE);
};
}
#endif // WORKER_H
worker.tcc :
#ifndef WORKER_TCC
#define WORKER_TCC
#include <iostream>
#define DEBUG(str) std::cout<<str<<" "<<this->_sz<<" "<<this->_dsz<<std::endl;
namespace separable_calculation
{
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(worker&& obj):
_src(obj._src),
_sz(obj._sz),
_dst(obj._dst),
_dsz(obj._dsz),
_status(obj._status)
{
DEBUG("move ctor")
}
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(const worker& obj,int):
_src(obj._src),
_sz(obj._sz),
_dst(obj._dst),
_dsz(obj._dsz),
_status(HAS_BEEN_PROCESSED)
{
DEBUG("copy ctor")
}
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(const_pointer src,const size_type& sz,pointer dst,const size_type& dsz):
_src(src),
_sz(sz),
_dst(dst),
_dsz(dsz == OUTPUT_SIZE_EQUAL_INPUT_SIZE ? sz : dsz),
_status(MUST_BE_PROCESSED)
{
DEBUG("param ctor")
}
template<class _Ty,template<class>class _Op>
tbb::task* worker<_Ty,_Op>::execute()
{
tbb::task* ret(nullptr);
// prepare the two sub-tasks
if((this->_status & MUST_BE_PROCESSED) == MUST_BE_PROCESSED)
{
tbb::task* a = new (this->allocate_child()) self(this->get_first_half(),int());
tbb::task* b = new (this->allocate_child()) self(this->get_second_half(),int());
this->set_ref_count(3);
this->spawn(*a);
this->spawn_and_wait_for_all(*b);
}
else
{
_Op<_Ty> op;
std::cout<<"check "<<this->_sz<<" "<<this->_dsz<<std::endl;
op(this->_src,this->_sz,this->_dst/*,this->_dsz*/);
}
return ret;
}
template<class _Ty,template<class>class _Op>
void worker<_Ty,_Op>::run()
{
TBB_ASSERT(this->_dst && this->_src);
self& s = *new(tbb::task::allocate_root()) self(std::move(*this));
tbb::task::spawn_root_and_wait(s);
}
template<class _Ty,template<class>class _Op>
void worker<_Ty,_Op>::update(const_pointer src,const std::size_t& sz,pointer dst,const std::size_t& dsz)
{
std::cout<<"update"<<std::endl;
if(src)
{
this->_src = src;
this->_sz = sz;
}
if(dst)
this->_dst = dst;
if(dsz != OUTPUT_SIZE_EQUAL_INPUT_SIZE)
this->_dsz = (dsz != OUTPUT_SIZE_EQUAL_INPUT_SIZE) ? dsz : this->_sz;
this->_status = MUST_BE_PROCESSED;
}
}
#endif // WORKER_TCC
main.cpp :
#include "worker.hpp"
#include "worker.tcc"
#include <cstdlib>
namespace
{
template<class _Ty>
struct add_t : separable_calculation::basic_operator<_Ty>
{
typedef separable_calculation::basic_operator<_Ty> MyBase;
typedef typename MyBase::const_pointer const_pointer;
typedef typename MyBase::pointer pointer;
typedef typename MyBase::const_src_pointer const_src_pointer;
typedef typename MyBase::dst_pointer dst_pointer;
add_t() = default;
virtual ~add_t() = default;
virtual void operator()(const_pointer src,const std::size_t& sz,pointer dst/*,const std::size_t& dsz*/)
{
pointer it_d = dst;
for(const_pointer it_s = src;it_s != (src+sz); it_s++,it_d++)
*it_d += *it_s;
}
};
}
int main()
{
std::vector<std::uint64_t> a(65536,1);
std::vector<std::uint64_t> b(a);
separable_calculation::worker<std::uint64_t,
add_t> calc(a.data(),a.size(),b.data(),b.size());
calc.run();
return EXIT_SUCCESS;
}
我对这个例子的输出是:
param ctor 65536 65536
move ctor 65536 65536
param ctor 1099511660544 32768
copy ctor 1099511660544 32768
param ctor 1099511660544 32768
copy ctor 1099511660544 32768
check 1099511660544 32768
然后就崩溃了。
现在,如果你按原样中和 class worker 上的 _dsz :
假设 worker2.hpp :
#ifndef WORKER2_HPP
#define WORKER2_HPP
#include <tbb/task.h>
#include <type_traits>
#ifndef TBB_ASSERT
#ifdef _DEBUG
#include <cassert>
#define TBB_ASSERT(x) std::assert(x)
#else
#define TBB_ASSERT(x)
#endif
#endif
namespace separable_calculation2
{
template<class _Ty,template<class>class _Op>
class worker : public tbb::task
{
// static_assert(std::is_function<_Op>::value || std::is_base_of<basic_operator<_Ty>*,&_Op>::value,"The operator functor must be a derivation of the struct basic_operator");
static_assert(!std::is_abstract<_Op<_Ty> >::value,"The operator struct / class is abstract");
public:
typedef _Ty* pointer;
typedef const _Ty* const_pointer;
typedef std::size_t size_type;
private:
typedef worker self;
enum
{
MUST_BE_PROCESSED = 0x2,
HAS_BEEN_PROCESSED = 0x4,
FIRST_HALF_MUTEX = 0x8,
SECOND_HALF_MUTEX = 0x10
};
const_pointer _src;
size_type _sz;
pointer _dst;
// size_type _dsz;
unsigned char _status;
inline worker get_first_half()
{
std::size_t chk(2);
return worker(this->_src,this->_sz/chk,this->_dst/*,this->_dsz/chk*/);
}
inline worker get_second_half()
{
const std::size_t half_src = this->_sz/2;
const std::size_t half_dst = this->_sz/2;
return worker(this->_src+half_src,half_src,this->_dst+half_dst/*,half_dst*/);
}
// helpfull for the method run.
worker(worker&& obj);
// usefull for prepare the two sub-tasks.
worker(const worker& obj,int);
public:
worker(const_pointer src,const size_type& sz,pointer dst/*,const size_type& dsz = OUTPUT_SIZE_EQUAL_INPUT_SIZE*/);
inline worker():
worker(nullptr,0,nullptr,0)
{}
worker(const worker&) = delete;
~worker() = default;
worker& operator=(const worker&) = delete;
worker& operator=(worker&&) = delete;
virtual tbb::task* execute();
// Execute the tasks.
void run();
// Update the source and | or the destination pointers.
void update(const_pointer src,const std::size_t& sz,pointer dst=nullptr/*,const std::size_t& dsz = OUTPUT_SIZE_EQUAL_INPUT_SIZE*/);
};
}
#endif // WORKER2_H
worker2.tcc:
#ifndef WORKER2_TCC
#define WORKER2_TCC
#include <iostream>
#define DEBUG(str) std::cout<<str<<" "<<this->_sz<<" "<<this->_dsz<<std::endl;
namespace separable_calculation2
{
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(worker&& obj):
_src(obj._src),
_sz(obj._sz),
_dst(obj._dst),
// _dsz(obj._dsz),
_status(obj._status)
{
// DEBUG("move ctor")
}
templa
te<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(const worker& obj,int):
_src(obj._src),
_sz(obj._sz),
_dst(obj._dst),
// _dsz(obj._dsz),
_status(HAS_BEEN_PROCESSED)
{
// DEBUG("copy ctor")
}
template<class _Ty,template<class>class _Op>
worker<_Ty,_Op>::worker(const_pointer src,const size_type& sz,pointer dst/*,const size_type& dsz*/):
_src(src),
_sz(sz),
_dst(dst),
// _dsz(dsz == OUTPUT_SIZE_EQUAL_INPUT_SIZE ? sz : dsz),
_status(MUST_BE_PROCESSED)
{
// DEBUG("param ctor")
}
template<class _Ty,template<class>class _Op>
tbb::task* worker<_Ty,_Op>::execute()
{
tbb::task* ret(nullptr);
// prepare the two sub-tasks
if((this->_status & MUST_BE_PROCESSED) == MUST_BE_PROCESSED)
{
tbb::task* a = new (this->allocate_child()) self(this->get_first_half(),int());
tbb::task* b = new (this->allocate_child()) self(this->get_second_half(),int());
this->set_ref_count(3);
this->spawn(*a);
this->spawn_and_wait_for_all(*b);
}
else
{
_Op<_Ty> op;
// std::cout<<"check "<<this->_sz<<" "<<this->_dsz<<std::endl;
op(this->_src,this->_sz,this->_dst/*,this->_dsz*/);
}
return ret;
}
template<class _Ty,template<class>class _Op>
void worker<_Ty,_Op>::run()
{
TBB_ASSERT(this->_dst && this->_src);
self& s = *new(tbb::task::allocate_root()) self(std::move(*this));
tbb::task::spawn_root_and_wait(s);
}
template<class _Ty,template<class>class _Op>
void worker<_Ty,_Op>::update(const_pointer src,const size_type& sz,pointer dst/*,const std::size_t& dsz*/)
{
std::cout<<"update"<<std::endl;
if(src)
{
this->_src = src;
this->_sz = sz;
}
if(dst)
this->_dst = dst;
// if(dsz != OUTPUT_SIZE_EQUAL_INPUT_SIZE)
// this->_dsz = (dsz != OUTPUT_SIZE_EQUAL_INPUT_SIZE) ? dsz : this->_sz;
this->_status = MUST_BE_PROCESSED;
}
}
#endif // WORKER2_TCC
现在在 main.cpp 添加:
#include "worker2.hpp"
#include "worker2.tcc"
并在函数 main 中添加中和之前的 calc 对象并写入:
separable_calculation2::worker<std::uint64_t,add_t> calc(a.data(),a.size(),b.data());
calc.run();
std::cout<<"END PROCESSING"<<std::endl;
我修好了。
所以问题出在方法get_first_half()
和get_second_half()
上。
这些函数对应的代码是这个:
inline worker get_first_half()
{
std::size_t chk(2);
return worker(this->_src,this->_sz/chk,this->_dst,this->_dsz/chk);
}
inline worker get_second_half()
{
const std::size_t half_src = this->_sz/2;
const std::size_t half_dst = this->_sz/2;
return worker(this->_src+half_src,half_src,this->_dst+half_dst,half_dst);
}
出于某种原因,我仍然想知道临时对象之前的临时对象 65536/2 == 32768 里面的临时对象仅用于第二个参数 65536/2 == 1099511660544。
如果创建一个对象并且 return 它可以工作。
即
inline worker get_first_half()
{
worker tmp(std::move(*this));
tmp._sz/=2;
tmp._dsz/=2;
return tmp;
}
inline worker get_second_half()
{
const std::size_t half_src = this->_sz/2;
const std::size_t half_dst = this->_sz/2;
worker tmp(std::move(*this));
tmp._sz/=2;
tmp._dsz/=2;
tmp._src+=tmp._sz;
tmp._dst+=tmp._dsz;
return tmp;
}
如果有人知道第一个实现有什么问题,我仍然很想了解这个问题。
否则就解决了。
这不是答案,只是对您的代码的注释。
我发现您使用低级任务 API(有高级 task_group
和 parallel_invoke
)但仍然采用阻塞方式,因此效率低下:
this->set_ref_count(3);
this->spawn(*a);
this->spawn_and_wait_for_all(*b);
它的效率低于连续传递样式。可读性不如高级 API