提升线程 - safe/guaranteed 处理线程中断的方法

Boost threads - safe/guaranteed way to handle thread interrupts

我正在将一个使用 pthreads 的 C 程序转移到 C++,并且为了使程序成为多平台、可移植等,我将不得不广泛使用 Boost 库。

过去使用线程时,我的代码通常采用以下形式:


void threadFunc(void* pUserData)
{
    if ( !pUserData )
    {
        return;
    }

    myStruct* pData = (myStruct*)pUserData;
    bool bRun;

    lock(pUserData);
    bRun = pUserData->bRun;
    unlock(pUserData);

    while ( bRun )
    {
        // Do some stuff
        // ...
        // ...
        lock(pUserData);
        bRun = pUserData->bRun;
        unlock(pUserData);
    }

    /* Done execution of thread; External thread must have set pUserData->bRun
     * to FALSE.
     */
}

这符合我的预期。当我希望线程关闭时,我只需锁定它正在访问的 struct 的互斥锁(通常是 struct 的成员),切换 bRun 标志,然后 join() 在线程上。对于 boost 线程,我注意到在我的代码中 timed_join() 在执行非阻塞操作 similar to this SO question 时超时。这让我怀疑我没有正确使用增强线程。

对于问题...

首先,两种通用线程结构中哪一种对于让线程正确捕获 thread_interrupted 异常是正确的?

案例 A


void threadFunc( void* pUserData )
{
    while ( 1 )
    {
        try
        {
            // Do some stuff
            // ...
            // ...
            boost::this_thread::sleep(boost::posix_time::milliseconds(1));
        }
        catch(boost::thread_interrupted const& )
        {
            // Thread interrupted; Clean up
        }
    }
}

案例 B


void threadFunc( void* pUserData )
{
    try
    {
        while ( 1 )
        {
            // Do some stuff
            // ...
            // ...
            boost::this_thread::sleep(boost::posix_time::milliseconds(1));
        }
    }
    catch(boost::thread_interrupted const& )
    {
        // Thread interrupted; Clean up
    }
}

其次, 如果我希望线程有 机会 调用什么代替睡眠的适当提升函数捕获中断调用,但不 sleep() 或放弃它当前持有的 CPU 时间片的剩余部分? From another SO question on this topic,似乎 boost::this_thread::interruption_point() 调用可能是我正在寻找的,但我不是 100% 确定它是否总是有效,从我在 SO 问题中读到的内容我引用了它来自.


最后, 据我了解,在我的循环中不调用任何方式的 boost sleep() 函数或一些类似的中断点函数将意味着 timed_join() 总是会超时,我要么必须:

这个假设是否正确?


谢谢。


参考资料

  1. Boost Thread - How to acknowledge interrupt< 年 1 月 18 日访问。
  2. boost::this_thread::interruption_point() doesn't throw boost::thread_interrupted& exception< 年 1 月 18 日访问。

您的代码不是异常安全的。因此,当您等待加入时,您很容易死锁。

如果您在持有互斥量时收到异常,您的代码将永远不会解锁互斥量,可能会导致等待线程中出现死锁。

要检查的事项:

  1. 首先,您的代码没有在任何地方显示您如何创建您试图中断的线程。

    需要成为运行它在boost::thread-管理的线程实例上。 这应该是显而易见的,因为如果没有这样的对象将很难调用 boost::thread::interrupt()。尽管如此,我还是想仔细检查一下,因为线程函数的函数签名强烈建议原生 POSIX pthread_create 用法。

    A good idea is to check the return value of interrupt_request() (which will be false for native threads):

    The boost::this_thread interrupt related functions behave in a degraded mode when called from a thread created using the native interface, i.e. boost::this_thread::interruption_enabled() returns false. As consequence the use of boost::this_thread::disable_interruption and boost::this_thread::restore_interruption will do nothing and calls to boost::this_thread::interruption_point() will be just ignored.

  2. lock/unlock的功能是如何实现的?这里同样如此。如果您混合使用 POSIX/non-boost API,那么您可能会错过信号。

    当你这样做的时候,开始使用 lock_guardunique_lock 这样你的代码就异常安全了。

  3. 关于您的 Case ACase B 问题,情况 B 最简单。

    案例 A 没有显示如何计划退出 while 循环。您当然可以使用 breakreturn(甚至 goto),但正如您展示的那样,这将是一个无限循环。

  4. 除了sleep()还有yield()明确放弃剩余的时间片同时作为中断点。

    我知道你说过你不想放弃时间片,但我不确定你是否意识到这会导致线程燃烧 CPU 如果没有工作要做:

    你希望不放弃剩余的时间片(表示low-latency、高带宽应用程序和lock-free 上下文)并且您正在谈论 同时使用 mutex。显然,后者比仅 yield 部分时间片要昂贵得多,因为它是 lock-free 并发的 相反

所以让我以两个良好风格的演示作为结尾,一个有锁的和一个无锁的(或 lock-conservative,具体取决于您的线程同步逻辑的其余部分)。

锁定演示

这使用 "old" 线程控制,因为它在锁定时很好,IMO:

Live On Coliru

#include <boost/thread.hpp>

void trace(std::string const& msg);

struct myStruct {
    bool keepRunning = true;

    using mutex_t = boost::mutex;
    using lock_t  = boost::unique_lock<mutex_t>;

    lock_t lock() const { return lock_t(mx); }

  private:
    mutex_t mutable mx;
};

void threadFunc(myStruct &userData) {
    trace("threadFunc enter");

    do 
    {
        {
            auto lock = userData.lock();
            if (!userData.keepRunning)
                break;
        } // destructor of unique_lock unlocks, exception safe


        // Do some stuff
        // ...
        // ...
        trace("(work)");
        boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
    } while (true);

    trace("threadFunc exit");
}

int main() {
    trace("Starting main");

    boost::thread_group threads;
    constexpr int N = 4;
    std::vector<myStruct> data(N);

    for (int i=0; i < N; ++i)
        threads.create_thread(boost::bind(threadFunc, boost::ref(data[i])));

    boost::this_thread::sleep_for(boost::chrono::seconds(1));

    trace("Main signaling shutdown");

    for (auto& d : data) {
        auto lock = d.lock();
        d.keepRunning = false;
    }

    threads.join_all();
    trace("Bye");
}

void trace(std::string const& msg) {
    static boost::mutex mx;
    boost::lock_guard<boost::mutex> lk(mx);

    static int thread_id_gen = 0;
    thread_local int thread_id = thread_id_gen++;

    std::cout << "Thread #" << thread_id << ": " << msg << "\n";
}

输出例如

Thread #0: Starting main
Thread #1: threadFunc enter
Thread #1: (work)
Thread #2: threadFunc enter
Thread #2: (work)
Thread #3: threadFunc enter
Thread #3: (work)
Thread #4: threadFunc enter
Thread #4: (work)
Thread #3: (work)
Thread #1: (work)
Thread #2: (work)
Thread #4: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #4: (work)
Thread #1: (work)
Thread #3: (work)
Thread #2: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #2: (work)
Thread #1: (work)
Thread #4: (work)
Thread #3: (work)
Thread #2: (work)
Thread #4: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #3: (work)
Thread #0: Main signaling shutdown
Thread #4: threadFunc exit
Thread #2: threadFunc exit
Thread #1: threadFunc exit
Thread #3: threadFunc exit
Thread #0: Bye

无锁演示

  1. 前面lockful例子的直译:

    Live On Coliru

  2. 但是,现在拥有一个共享的关闭标志可能更有意义:

    Live On Coliru

    #include <boost/thread.hpp>
    
    void trace(std::string const& msg);
    
    struct myStruct {
        int value;
    };
    
    void threadFunc(myStruct userData, boost::atomic_bool& keepRunning) {
        std::string valuestr = std::to_string(userData.value);
        trace("threadFunc enter(" + valuestr + ")");
    
        do 
        {
            if (!keepRunning)
                break;
    
            // Do some stuff
            // ...
            // ...
            trace("(work" + valuestr + ")");
            boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
        } while (true);
    
        trace("threadFunc " + valuestr + " exit");
    }
    
    int main() {
        boost::atomic_bool running { true };
        trace("Starting main");
    
        boost::thread_group threads;
        constexpr int N = 4;
    
        for (int i=0; i < N; ++i) {
            threads.create_thread(
                boost::bind(threadFunc, myStruct{i}, boost::ref(running)
            ));
        }
    
        boost::this_thread::sleep_for(boost::chrono::seconds(1));
    
        trace("Main signaling shutdown");
    
        running = false;
    
        threads.join_all();
        trace("Bye");
    }
    
    void trace(std::string const& msg) {
        static boost::mutex mx;
        boost::lock_guard<boost::mutex> lk(mx);
    
        static int thread_id_gen = 0;
        thread_local int thread_id = thread_id_gen++;
    
        std::cout << "Thread #" << thread_id << ": " << msg << "\n";
    }
    

    请注意 "natural C++" 我们可以使用 boost::bind¹

  3. 将数据传递到 threadFunc 的方式更多
  4. interruption_point()

    请注意 interrupt() 状态的文档:

    Effects: If *this refers to a thread of execution, request that the thread will be interrupted the next time it enters one of the predefined interruption points with interruption enabled, or if it is currently blocked in a call to one of the predefined interruption points with interruption enabled [emphasis mine]

    值得注意的是,可以暂时禁用中断。 Boost 仅为此提供 RAII-enabled 类,因此默认情况下是异常安全的。如果您的代码使用class disable_interruption or class restore_interruption,您不必担心这个。

    Live On Coliru

    #include <boost/thread.hpp>
    
    void trace(std::string const& msg);
    
    struct myStruct {
        int value;
    };
    
    void threadFunc(myStruct userData) {
        std::string valuestr = std::to_string(userData.value);
        trace("threadFunc enter(" + valuestr + ")");
    
        // using `Case B` form from your question
        try {
            do 
            {
                boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); // Avoids huge output for demo
    
                trace("(work" + valuestr + ")");
                boost::this_thread::interruption_point();
            } while (true);
        } catch(boost::thread_interrupted const& tie) {
            trace("threadFunc " + valuestr + " interrupted");
        }
    
        trace("threadFunc " + valuestr + " exit");
    }
    
    int main() {
        trace("Starting main");
    
        boost::thread_group threads;
        constexpr int N = 4;
    
        for (int i=0; i < N; ++i) {
            threads.create_thread(boost::bind(threadFunc, myStruct{i}));
        }
    
        boost::this_thread::sleep_for(boost::chrono::seconds(1));
    
        trace("Main signaling shutdown");
        threads.interrupt_all();
    
        threads.join_all();
        trace("Bye");
    }
    
    void trace(std::string const& msg) {
        static boost::mutex mx;
        boost::lock_guard<boost::mutex> lk(mx);
    
        static int thread_id_gen = 0;
        thread_local int thread_id = thread_id_gen++;
    
        std::cout << "Thread #" << thread_id << ": " << msg << "\n";
    }
    

¹ 或 lambda,如果您愿意的话