如何在循环线程中使用 wait_for?

How to use wait_for in a looping thread?

我想要一个线程 (std::thread),它每 60 秒执行一次工作,否则会休眠并在外部请求时立即 returns。 std::jthread 不是一个选项,我仅限于 C++14。

std::condition_variable cv;

//Thread1:
void ThreadWork
{
    while(cv.wait_for(someLock, 60s) == std::cv_status_timeout)
    {
        Work();
    }
    return;
}

//Thread2:
void RequestEnd()
{
    cv.notify_one();
}

这里的想法是,如果 return val 是 std::cv_status_timeout,60 秒过去了,我们正常工作。否则,return.

目前,我从运行时收到一些关于锁的投诉,或者直接 std::terminate

用循环检查原子变量做一些技巧不是我想要的;这是关于线程休眠。

至少我的方向是正确的吗?

由于虚假唤醒,在没有其他条件的情况下使用 condition_variable 是错误的。您至少需要一个 bool 变量。

someLock 应该保护其他条件,它应该锁定在 wait_for

在一个线程中获取的锁应该在同一个线程中释放。 (通常,但并非总是如此,锁是堆栈变量)。

另外由于spurious wakes non-predicate wait_for使用起来不方便,需要重新计算timeout。首选谓词形式,并检查谓词中的条件。

std::condition_variable 是低级原语。它的部分设计是 虚假唤醒 发生;这意味着,有时等待通知的人会收到通知,即使没有人发送通知。

我怀疑是几倍的原因

  1. 大多数 OS 的底层机制都有这种虚假唤醒。因此,如果标准库实现者对最终用户隐藏了它,则它必须编写代码来处理它。

  2. 几乎在 std::condition_variable 的每个用例中,处理比预期移动 faster/slower 的线程的代码最终连接到有效的虚假唤醒代码并与之重叠。因此,如果图书馆为您处理,无论如何您最终都会重复工作。

你的下一个问题是你描述的逻辑有点模糊。计算机中的时间不应被视为绝对时间;在两个不同的线程中没有“在相同的 60 秒间隔内”。

在某些同步之前发生并在该同步之后发生。

我怀疑您可能想要闩锁。锁存器是同步原语(但不如条件变量原语)。想想门或大门上的闩锁。它开始是关闭的,你可以打开它;但是一旦打开就无法再关闭了

在这里,latch 被“打开”意味着“工作线程,停止你无休止的工作”。

struct latch {
  void open_latch() {
    auto l = lock();
    open = true;
    cv.notify_all();
  }
  void wait() const {
    auto l = lock();
    cv.wait(l, [&]{ return open; });
  }
  template<class Rep, class Period>
  bool wait_for(const std::chrono::duration<Rep, Period>& duration) const {
    auto l = lock();
    return cv.wait_for(l, duration, [&]{ return open; });
  }
  template<class Clock, class Period>
  bool wait_until(const std::chrono::time_point<Clock, Period>& when) const {
    auto l = lock();
    return cv.wait_until(l, when, [&]{ return open; });
  }
private:
  auto lock() const { return std::unique_lock<std::mutex>(m); }
  mutable std::mutex m;
  bool open = false;
  std::condition_variable cv;
};

现在您的代码如下所示:

latch l;

Thread1:
void ThreadWork
{
    while(!l.wait_for(60s))
    {
        Work();
    }
    return;
}

Thread2:
void RequestEnd()
{
    l.open_latch();
}

(代码未经测试,但这不是我的第一个牛仔竞技表演)。

这个模式处理了很多事情,包括在任何人等待之前打开闩锁。

如果您希望 X 分钟后发生 X 次工作,我建议使用 wait_until 而不是 wait_for(请注意,如果工作超过 1 分钟,等待时间将会减少接近零时间)。如果您想要在工作之间休息 1 分钟,请使用 wait_for.

几乎所有 std::condition_variable 的使用都有这个三部分系统;互斥体、有效载荷和条件变量。互斥量应该几乎总是保护有效负载(即使是原子的!)并且只保护有效负载。有时有效载荷由两部分组成,例如中止标志和更复杂的数据结构。

使用线程你需要非常精确 您使用条件变量的想法是正确的 如果您尝试在线程开始之前停止该线程,则会错过条件变量通知并且您的线程将永远不会停止。 然后就是条件变量的其他问题,看我的classstate_variable。这个示例非常详细,但它解决了所有这些问题并避免了竞争条件。

#include <chrono>
#include <iostream>
#include <mutex>
#include <future>
#include <condition_variable>

//-----------------------------------------------------------------------------------------------------
// state of the thread that does the waiting
// used for stable starting and stopping

enum class thread_state
{
    idle,
    starting,
    running,
    stopping,
    stopped
};

//-----------------------------------------------------------------------------------------------------
// helper class for use of std::condition_variable, makes code more readable
// and takes into account the pitfalls of condition variables : 
// https://www.modernescpp.com/index.php/c-core-guidelines-be-aware-of-the-traps-of-condition-variables


template<typename T>
class state_variable
{
public:
    state_variable() = delete;
    state_variable(const state_variable&) = delete;
    state_variable(state_variable&&) = delete;
    state_variable& operator=(const state_variable&) = delete;

    explicit state_variable(const T& value) :
        m_value{ value }
    {
    }

    void operator=(const T& value) noexcept
    {
        {
            std::unique_lock<std::mutex> lock(m_value_mutex);
            m_value = value;
        }
        m_value_changed.notify_all();
    }

    // atomic check and set
    T set_if(const T& from_value, const T& to_value) noexcept
    {
        {
            std::unique_lock<std::mutex> lock(m_value_mutex);
            if (m_value != from_value) return from_value;
            m_value = to_value;
        }
        m_value_changed.notify_all();
        return to_value;
    }

    const bool try_wait_for(const T& value, const std::chrono::steady_clock::duration& duration) const noexcept
    {
        auto pred = [this, value] { return (m_value == value); };
        std::unique_lock<std::mutex> lock(m_value_mutex);
        if (pred()) return true;
        return m_value_changed.wait_for(lock, duration, pred);
    }

    void wait_for(const T& value) const
    {
        try_wait_for(value, std::chrono::steady_clock::duration::max());
    }

private:
    // mutables so I could make the const promises on wait 
    // that they wont change the observable state (m_value)
    // of this class.
    mutable std::mutex m_value_mutex;
    mutable std::condition_variable m_value_changed;
    std::atomic<T> m_value;
};

//-----------------------------------------------------------------------------------------------------

class Worker final
{
public:
    template<typename lambda_t>
    Worker(lambda_t lambda, const std::chrono::steady_clock::duration& loop_time) :
        m_state{ thread_state::idle },
        m_looptime{ loop_time },
        m_work{ lambda }
    {
    };

    Worker(const Worker&) = delete;
    Worker(Worker&&) = delete;
    Worker& operator=(const Worker&) = delete;

    void Start()
    {
        if (m_state.set_if(thread_state::idle, thread_state::starting) != thread_state::starting)
        {
            throw std::runtime_error("only an idle Worker can be started");
        }

        // 
        // Note that std::async, and std::thread don't guarantee the thread is active
        // when the call returns!
        // 
        // it is okay to capture "this" because the destructor of the 
        // Worker synchronizes with termination of this thread through the future
        // So the thread will have a shorter life cycle then the worker!
        //
        m_future = std::async(std::launch::async, [this]
        {
            // Set indication that the thread has really started.
            m_state = thread_state::running;

            do
            {
                m_work();
        
                // using a statevariable to check for stopping means it can respond 
                // during the one second delay and stop immediately. 
                // this is an advantage over using sleep
            } while (!m_state.try_wait_for(thread_state::stopping, m_looptime));

            m_state = thread_state::stopped;
        });

        // Wait for the thread to have really started
        // this way we have a clear post condition for start
        m_state.wait_for(thread_state::running);
    }

    void Stop()
    {
        // only allow a running Worker to be stopped.
        // in all other states Stop does nothing
        if (m_state.set_if(thread_state::running, thread_state::stopping) == thread_state::stopping)
        {
            // synchronization with stopped state, as set by other thread
            m_state.wait_for(thread_state::stopped);

            // future get is not really needed for synchronization.
            // but if thread threw an exception it's rethrown here 
            m_future.get();
        }
    }

    ~Worker()
    {
        // Automatically stop thread if this hasn't already happened.
        Stop();
    }

private:
    std::future<void> m_future;
    state_variable<thread_state> m_state;
    std::chrono::steady_clock::duration m_looptime;
    std::function<void()> m_work;
};


int main()
{
    auto work = []
    {
        std::cout << ".";
    };

    Worker worker(work, std::chrono::seconds(1)); // make 60 for your case and replace work with lambda or std::function<void()>

    std::cout << "Press enter to stop..." << std::endl;
    std::cout << "Every second Work() will be called, and a '.' be printed" << std::endl;
    std::string input;

    worker.Start();
    std::getline(std::cin,input);

    return 0;

    // destructor of worker takes care thread is stopped nicely before exiting the program!
}

我的建议是,如果可以避免的话,不要使用条件变量。他们有很多陷阱,最后你没有“通知线程”的通用解决方案。

首先让我描述一下一般情况,所以你看,这是怎么回事:

一般来说,你想要

  • 在固定的时间(或预定的时间点)后做某事
  • 接收应用程序定义的“消息”或“事件”,可能由其他线程放入队列
  • 等待资源,例如套接字句柄、文件句柄...

当您的要求发生变化并且您需要一般情况的更多特征时,您的子集问题的条件变量方法无法扩展。

因此,考虑到条件变量出错的风险以及这种方法缺乏灵活性并且不考虑未来变化的事实,我建议直接采用通用解决方案。

正如我在问题下方的评论中发表的那样,您可能必须使用系统特定的 api 或使用一个库,它会为您完成这些工作并为您提供可移植的 API。 (我不定期检查 C++ 在其最新变体中是否添加了这样的功能。)

std::future<void> 似乎可以解决问题。

// Worker thread setup
void Work(std::future<void> killSwitch)
{
    while(killSwitch.wait_for(60s) == std::future_status::timeout)
    {
        // ... Stuff
    }
}

// Controller thread
std::promise<void> killSwitch;
std::thread worker{ Work, std::move(killSwitch.get_future()) };

// ... Stuff

killSwitch.set_value(); // <-- this will break the loop
worker.join();

没有互斥体、条件变量、虚假唤醒、锁或原子。