如何在 std::async 线程完成之前过早地杀死它们 *而不* 使用 std::atomic_bool?
How to prematurely kill std::async threads before they are finished *without* using a std::atomic_bool?
我有一个接受回调的函数,并用它在 10 个独立的线程上工作。但是,通常情况下并不是所有的工作都需要。例如,如果在第三个线程上获得了期望的结果,它应该停止对其余活动线程所做的所有工作。
这个答案 here 表明这是不可能的,除非你让回调函数接受一个额外的 std::atomic_bool
参数,该参数表明函数是否应该提前终止。
这个解决方案对我不起作用。工人在一个基础 class 中旋转起来,这个基础 class 的全部意义在于抽象出多线程的细节。我怎样才能做到这一点?我预计我将不得不放弃 std::async
以获得更多相关内容。
#include <iostream>
#include <future>
#include <vector>
class ABC{
public:
std::vector<std::future<int> > m_results;
ABC() {};
~ABC(){};
virtual int callback(int a) = 0;
void doStuffWithCallBack();
};
void ABC::doStuffWithCallBack(){
// start working
for(int i = 0; i < 10; ++i)
m_results.push_back(std::async(&ABC::callback, this, i));
// analyze results and cancel all threads when you get the 1
for(int j = 0; j < 10; ++j){
double foo = m_results[j].get();
if ( foo == 1){
break; // but threads continue running
}
}
std::cout << m_results[9].get() << " <- this shouldn't have ever been computed\n";
}
class Derived : public ABC {
public:
Derived() : ABC() {};
~Derived() {};
int callback(int a){
std::cout << a << "!\n";
if (a == 3)
return 1;
else
return 0;
};
};
int main(int argc, char **argv)
{
Derived myObj;
myObj.doStuffWithCallBack();
return 0;
}
我只想说这可能不应该是 'normal' 程序的一部分,因为它可能会泄漏资源 and/or 使您的程序处于不稳定状态,但为了科学...
如果您可以控制线程循环,并且不介意使用平台功能,则可以 inject an exception 进入线程。在 posix 中,您可以为此使用信号,在 Windows 中,您将不得不使用 SetThreadContext()。虽然异常通常会展开堆栈并调用析构函数,但当异常发生时,您的线程可能正在系统调用或其他 'non-exception safe place' 中。
免责声明:我目前只有Linux,所以我没有测试Windows代码。
#if defined(_WIN32)
# define ITS_WINDOWS
#else
# define ITS_POSIX
#endif
#if defined(ITS_POSIX)
#include <signal.h>
#endif
void throw_exception() throw(std::string())
{
throw std::string();
}
void init_exceptions()
{
volatile int i = 0;
if (i)
throw_exception();
}
bool abort_thread(std::thread &t)
{
#if defined(ITS_WINDOWS)
bool bSuccess = false;
HANDLE h = t.native_handle();
if (INVALID_HANDLE_VALUE == h)
return false;
if (INFINITE == SuspendThread(h))
return false;
CONTEXT ctx;
ctx.ContextFlags = CONTEXT_CONTROL;
if (GetThreadContext(h, &ctx))
{
#if defined( _WIN64 )
ctx.Rip = (DWORD)(DWORD_PTR)throw_exception;
#else
ctx.Eip = (DWORD)(DWORD_PTR)throw_exception;
#endif
bSuccess = SetThreadContext(h, &ctx) ? true : false;
}
ResumeThread(h);
return bSuccess;
#elif defined(ITS_POSIX)
pthread_kill(t.native_handle(), SIGUSR2);
#endif
return false;
}
#if defined(ITS_POSIX)
void worker_thread_sig(int sig)
{
if(SIGUSR2 == sig)
throw std::string();
}
#endif
void init_threads()
{
#if defined(ITS_POSIX)
struct sigaction sa;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sa.sa_handler = worker_thread_sig;
sigaction(SIGUSR2, &sa, 0);
#endif
}
class tracker
{
public:
tracker() { printf("tracker()\n"); }
~tracker() { printf("~tracker()\n"); }
};
int main(int argc, char *argv[])
{
init_threads();
printf("main: starting thread...\n");
std::thread t([]()
{
try
{
tracker a;
init_exceptions();
printf("thread: started...\n");
std::this_thread::sleep_for(std::chrono::minutes(1000));
printf("thread: stopping...\n");
}
catch(std::string s)
{
printf("thread: exception caught...\n");
}
});
printf("main: sleeping...\n");
std::this_thread::sleep_for(std::chrono::seconds(2));
printf("main: aborting...\n");
abort_thread(t);
printf("main: joining...\n");
t.join();
printf("main: exiting...\n");
return 0;
}
输出:
main: starting thread...
main: sleeping...
tracker()
thread: started...
main: aborting...
main: joining...
~tracker()
thread: exception caught...
main: exiting...
我有一个接受回调的函数,并用它在 10 个独立的线程上工作。但是,通常情况下并不是所有的工作都需要。例如,如果在第三个线程上获得了期望的结果,它应该停止对其余活动线程所做的所有工作。
这个答案 here 表明这是不可能的,除非你让回调函数接受一个额外的 std::atomic_bool
参数,该参数表明函数是否应该提前终止。
这个解决方案对我不起作用。工人在一个基础 class 中旋转起来,这个基础 class 的全部意义在于抽象出多线程的细节。我怎样才能做到这一点?我预计我将不得不放弃 std::async
以获得更多相关内容。
#include <iostream>
#include <future>
#include <vector>
class ABC{
public:
std::vector<std::future<int> > m_results;
ABC() {};
~ABC(){};
virtual int callback(int a) = 0;
void doStuffWithCallBack();
};
void ABC::doStuffWithCallBack(){
// start working
for(int i = 0; i < 10; ++i)
m_results.push_back(std::async(&ABC::callback, this, i));
// analyze results and cancel all threads when you get the 1
for(int j = 0; j < 10; ++j){
double foo = m_results[j].get();
if ( foo == 1){
break; // but threads continue running
}
}
std::cout << m_results[9].get() << " <- this shouldn't have ever been computed\n";
}
class Derived : public ABC {
public:
Derived() : ABC() {};
~Derived() {};
int callback(int a){
std::cout << a << "!\n";
if (a == 3)
return 1;
else
return 0;
};
};
int main(int argc, char **argv)
{
Derived myObj;
myObj.doStuffWithCallBack();
return 0;
}
我只想说这可能不应该是 'normal' 程序的一部分,因为它可能会泄漏资源 and/or 使您的程序处于不稳定状态,但为了科学...
如果您可以控制线程循环,并且不介意使用平台功能,则可以 inject an exception 进入线程。在 posix 中,您可以为此使用信号,在 Windows 中,您将不得不使用 SetThreadContext()。虽然异常通常会展开堆栈并调用析构函数,但当异常发生时,您的线程可能正在系统调用或其他 'non-exception safe place' 中。
免责声明:我目前只有Linux,所以我没有测试Windows代码。
#if defined(_WIN32)
# define ITS_WINDOWS
#else
# define ITS_POSIX
#endif
#if defined(ITS_POSIX)
#include <signal.h>
#endif
void throw_exception() throw(std::string())
{
throw std::string();
}
void init_exceptions()
{
volatile int i = 0;
if (i)
throw_exception();
}
bool abort_thread(std::thread &t)
{
#if defined(ITS_WINDOWS)
bool bSuccess = false;
HANDLE h = t.native_handle();
if (INVALID_HANDLE_VALUE == h)
return false;
if (INFINITE == SuspendThread(h))
return false;
CONTEXT ctx;
ctx.ContextFlags = CONTEXT_CONTROL;
if (GetThreadContext(h, &ctx))
{
#if defined( _WIN64 )
ctx.Rip = (DWORD)(DWORD_PTR)throw_exception;
#else
ctx.Eip = (DWORD)(DWORD_PTR)throw_exception;
#endif
bSuccess = SetThreadContext(h, &ctx) ? true : false;
}
ResumeThread(h);
return bSuccess;
#elif defined(ITS_POSIX)
pthread_kill(t.native_handle(), SIGUSR2);
#endif
return false;
}
#if defined(ITS_POSIX)
void worker_thread_sig(int sig)
{
if(SIGUSR2 == sig)
throw std::string();
}
#endif
void init_threads()
{
#if defined(ITS_POSIX)
struct sigaction sa;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sa.sa_handler = worker_thread_sig;
sigaction(SIGUSR2, &sa, 0);
#endif
}
class tracker
{
public:
tracker() { printf("tracker()\n"); }
~tracker() { printf("~tracker()\n"); }
};
int main(int argc, char *argv[])
{
init_threads();
printf("main: starting thread...\n");
std::thread t([]()
{
try
{
tracker a;
init_exceptions();
printf("thread: started...\n");
std::this_thread::sleep_for(std::chrono::minutes(1000));
printf("thread: stopping...\n");
}
catch(std::string s)
{
printf("thread: exception caught...\n");
}
});
printf("main: sleeping...\n");
std::this_thread::sleep_for(std::chrono::seconds(2));
printf("main: aborting...\n");
abort_thread(t);
printf("main: joining...\n");
t.join();
printf("main: exiting...\n");
return 0;
}
输出:
main: starting thread...
main: sleeping...
tracker()
thread: started...
main: aborting...
main: joining...
~tracker()
thread: exception caught...
main: exiting...