理解 Boost.Fiber 库中代码示例的问题
problem with understanding example of code from Boost.Fiber library
我目前正在尝试(为了学习)理解 Boost.Fiber 库中的代码示例:https://www.boost.org/doc/libs/1_71_0/libs/fiber/examples/work_sharing.cpp
// Copyright Nat Goodspeed + Oliver Kowalke 2015.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <deque>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <boost/assert.hpp>
#include <boost/fiber/all.hpp>
#include <boost/fiber/detail/thread_barrier.hpp>
static std::size_t fiber_count{ 0 };
static std::mutex mtx_count{};
static boost::fibers::condition_variable_any cnd_count{};
typedef std::unique_lock< std::mutex > lock_type;
/*****************************************************************************
* example fiber function
*****************************************************************************/
//[fiber_fn_ws
void whatevah( char me) {
try {
std::thread::id my_thread = std::this_thread::get_id(); /**< get ID of initial thread >*/
{
std::ostringstream buffer;
buffer << "fiber " << me << " started on thread " << my_thread << '\n';
std::cout << buffer.str() << std::flush;
}
for ( unsigned i = 0; i < 10; ++i) { /**< loop ten times >*/
boost::this_fiber::yield(); /**< yield to other fibers >*/
std::thread::id new_thread = std::this_thread::get_id(); /**< get ID of current thread >*/
if ( new_thread != my_thread) { /**< test if fiber was migrated to another thread >*/
my_thread = new_thread;
std::ostringstream buffer;
buffer << "fiber " << me << " switched to thread " << my_thread << '\n';
std::cout << buffer.str() << std::flush;
}
}
} catch ( ... ) {
}
lock_type lk( mtx_count);
if ( 0 == --fiber_count) { /**< Decrement fiber counter for each completed fiber. >*/
lk.unlock();
cnd_count.notify_all(); /**< Notify all fibers waiting on `cnd_count`. >*/
}
}
//]
/*****************************************************************************
* example thread function
*****************************************************************************/
//[thread_fn_ws
void thread( boost::fibers::detail::thread_barrier * b) {
std::ostringstream buffer;
buffer << "thread started " << std::this_thread::get_id() << std::endl;
std::cout << buffer.str() << std::flush;
boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /**<
Install the scheduling algorithm `boost::fibers::algo::shared_work` in order to
join the work sharing.
>*/
b->wait(); /**< sync with other threads: allow them to start processing >*/
lock_type lk( mtx_count);
cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /**<
Suspend main fiber and resume worker fibers in the meanwhile.
Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
if all worker fibers are complete.
>*/
BOOST_ASSERT( 0 == fiber_count);
}
//]
/*****************************************************************************
* main()
*****************************************************************************/
int main( int argc, char *argv[]) {
std::cout << "main thread started " << std::this_thread::get_id() << std::endl;
//[main_ws
boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*<
Install the scheduling algorithm `boost::fibers::algo::shared_work` in the main thread
too, so each new fiber gets launched into the shared pool.
>*/
for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*<
Launch a number of worker fibers; each worker fiber picks up a character
that is passed as parameter to fiber-function `whatevah`.
Each worker fiber gets detached.
>*/
boost::fibers::fiber([c](){ whatevah( c); }).detach();
++fiber_count; /*< Increment fiber counter for each new fiber. >*/
}
boost::fibers::detail::thread_barrier b( 4);
std::thread threads[] = { /*<
Launch a couple of threads that join the work sharing.
>*/
std::thread( thread, & b),
std::thread( thread, & b),
std::thread( thread, & b)
};
b.wait(); /*< sync with other threads: allow them to start processing >*/
{
lock_type/*< `lock_type` is typedef'ed as __unique_lock__< [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] > >*/ lk( mtx_count);
cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
Suspend main fiber and resume worker fibers in the meanwhile.
Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
if all worker fibers are complete.
>*/
} /*<
Releasing lock of mtx_count is required before joining the threads, otherwise
the other threads would be blocked inside condition_variable::wait() and
would never return (deadlock).
>*/
BOOST_ASSERT( 0 == fiber_count);
for ( std::thread & t : threads) { /*< wait for threads to terminate >*/
t.join();
}
//]
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
我无法理解纤程如何在不同线程中继续执行,当所有线程都在等待 notify_all()
时,因为当所有线程都因为 wait
函数:cnd_count.wait( lk, [](){ return 0 == fiber_count; } )
。
那么如果都被wait
函数阻塞了,fiber怎么可能继续执行,我还以为fiber是线程借助自己的scheduling manager执行的。我读到线程只是纤维使用的执行单元,以便 运行 它们自己的可调用函数或函数。那么,当所有线程都阻塞时,为什么所有光纤继续 运行 呢?
参见:https://www.boost.org/doc/libs/1_71_0/libs/fiber/doc/html/fiber/scheduling.html
每当线程挂起时,调度程序 运行 下一个就绪纤程,因此 whatevah
在调用 cnd_count.wait
时执行。
你可以尝试去掉thread_fn_ws
上的boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >();
行,你会发现主线程中所有的fiber都是运行,因为没有安装scheduler所以fiber won成为 运行。
我目前正在尝试(为了学习)理解 Boost.Fiber 库中的代码示例:https://www.boost.org/doc/libs/1_71_0/libs/fiber/examples/work_sharing.cpp
// Copyright Nat Goodspeed + Oliver Kowalke 2015.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <deque>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <boost/assert.hpp>
#include <boost/fiber/all.hpp>
#include <boost/fiber/detail/thread_barrier.hpp>
static std::size_t fiber_count{ 0 };
static std::mutex mtx_count{};
static boost::fibers::condition_variable_any cnd_count{};
typedef std::unique_lock< std::mutex > lock_type;
/*****************************************************************************
* example fiber function
*****************************************************************************/
//[fiber_fn_ws
void whatevah( char me) {
try {
std::thread::id my_thread = std::this_thread::get_id(); /**< get ID of initial thread >*/
{
std::ostringstream buffer;
buffer << "fiber " << me << " started on thread " << my_thread << '\n';
std::cout << buffer.str() << std::flush;
}
for ( unsigned i = 0; i < 10; ++i) { /**< loop ten times >*/
boost::this_fiber::yield(); /**< yield to other fibers >*/
std::thread::id new_thread = std::this_thread::get_id(); /**< get ID of current thread >*/
if ( new_thread != my_thread) { /**< test if fiber was migrated to another thread >*/
my_thread = new_thread;
std::ostringstream buffer;
buffer << "fiber " << me << " switched to thread " << my_thread << '\n';
std::cout << buffer.str() << std::flush;
}
}
} catch ( ... ) {
}
lock_type lk( mtx_count);
if ( 0 == --fiber_count) { /**< Decrement fiber counter for each completed fiber. >*/
lk.unlock();
cnd_count.notify_all(); /**< Notify all fibers waiting on `cnd_count`. >*/
}
}
//]
/*****************************************************************************
* example thread function
*****************************************************************************/
//[thread_fn_ws
void thread( boost::fibers::detail::thread_barrier * b) {
std::ostringstream buffer;
buffer << "thread started " << std::this_thread::get_id() << std::endl;
std::cout << buffer.str() << std::flush;
boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /**<
Install the scheduling algorithm `boost::fibers::algo::shared_work` in order to
join the work sharing.
>*/
b->wait(); /**< sync with other threads: allow them to start processing >*/
lock_type lk( mtx_count);
cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /**<
Suspend main fiber and resume worker fibers in the meanwhile.
Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
if all worker fibers are complete.
>*/
BOOST_ASSERT( 0 == fiber_count);
}
//]
/*****************************************************************************
* main()
*****************************************************************************/
int main( int argc, char *argv[]) {
std::cout << "main thread started " << std::this_thread::get_id() << std::endl;
//[main_ws
boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*<
Install the scheduling algorithm `boost::fibers::algo::shared_work` in the main thread
too, so each new fiber gets launched into the shared pool.
>*/
for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*<
Launch a number of worker fibers; each worker fiber picks up a character
that is passed as parameter to fiber-function `whatevah`.
Each worker fiber gets detached.
>*/
boost::fibers::fiber([c](){ whatevah( c); }).detach();
++fiber_count; /*< Increment fiber counter for each new fiber. >*/
}
boost::fibers::detail::thread_barrier b( 4);
std::thread threads[] = { /*<
Launch a couple of threads that join the work sharing.
>*/
std::thread( thread, & b),
std::thread( thread, & b),
std::thread( thread, & b)
};
b.wait(); /*< sync with other threads: allow them to start processing >*/
{
lock_type/*< `lock_type` is typedef'ed as __unique_lock__< [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] > >*/ lk( mtx_count);
cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
Suspend main fiber and resume worker fibers in the meanwhile.
Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
if all worker fibers are complete.
>*/
} /*<
Releasing lock of mtx_count is required before joining the threads, otherwise
the other threads would be blocked inside condition_variable::wait() and
would never return (deadlock).
>*/
BOOST_ASSERT( 0 == fiber_count);
for ( std::thread & t : threads) { /*< wait for threads to terminate >*/
t.join();
}
//]
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
我无法理解纤程如何在不同线程中继续执行,当所有线程都在等待 notify_all()
时,因为当所有线程都因为 wait
函数:cnd_count.wait( lk, [](){ return 0 == fiber_count; } )
。
那么如果都被wait
函数阻塞了,fiber怎么可能继续执行,我还以为fiber是线程借助自己的scheduling manager执行的。我读到线程只是纤维使用的执行单元,以便 运行 它们自己的可调用函数或函数。那么,当所有线程都阻塞时,为什么所有光纤继续 运行 呢?
参见:https://www.boost.org/doc/libs/1_71_0/libs/fiber/doc/html/fiber/scheduling.html
每当线程挂起时,调度程序 运行 下一个就绪纤程,因此 whatevah
在调用 cnd_count.wait
时执行。
你可以尝试去掉thread_fn_ws
上的boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >();
行,你会发现主线程中所有的fiber都是运行,因为没有安装scheduler所以fiber won成为 运行。