asio::strand 上的任务在单个线程上 运行
Tasks on asio::strand are running on a single thread
我使用来自 4a here
的库的独立版本修改了一个 asio strand 示例
#include <iostream>
#include <asio.hpp>
#include <future>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std::chrono_literals;
namespace util
{
static std::mutex s_mtx_print;
// Default argument value
// https://en.cppreference.com/w/cpp/language/default_arguments
template <typename... Args>
void sync_print(const bool log_thread_id, Args &&... args)
{
std::lock_guard<std::mutex> print_lock(s_mtx_print);
if (log_thread_id)
{
std::cout << "[" << std::this_thread::get_id() << "] ";
}
(std::cout << ... << args) << '\n';
}
}
void Worker(std::unique_ptr<asio::io_service> &ios)
{
util::sync_print(true, " Started...");
if(ios) {ios->run();}
util::sync_print(true, " End");
}
void PrintNum(int n)
{
std::cout << "[" << std::this_thread::get_id() << "] " << n << '\n';
std::this_thread::sleep_for(300ms);
}
void OrderedInvocation(std::unique_ptr<asio::io_service::strand> &up_strand)
{
if(up_strand)
{
up_strand->post(std::bind(&PrintNum, 1));
up_strand->post(std::bind(&PrintNum, 2));
up_strand->post(std::bind(&PrintNum, 3));
up_strand->post(std::bind(&PrintNum, 4));
up_strand->post(std::bind(&PrintNum, 5));
up_strand->post(std::bind(&PrintNum, 6));
up_strand->post(std::bind(&PrintNum, 7));
up_strand->post(std::bind(&PrintNum, 8));
up_strand->post(std::bind(&PrintNum, 9));
}
else{
std::cerr << "Invalid strand" << '\n';
}
}
int main()
{
util::sync_print(true, "section 4 started ...");
auto up_ios = std::make_unique<asio::io_service>();
auto up_work = std::make_unique<asio::io_service::work>(*up_ios);
auto up_strand = std::make_unique<asio::io_service::strand>(*up_ios);
std::vector<std::future<void>> tasks;
constexpr int NUM_TASK = 3;
for(int i = 0; i< NUM_TASK; ++i)
{
tasks.push_back(std::async(std::launch::async, &Worker, std::ref(up_ios)));
}
std::cout << "Task size " << tasks.size() << '\n';
std::this_thread::sleep_for(500ms);
OrderedInvocation(up_strand);
up_work.reset();
for(auto &t: tasks){ t.get(); }
return 0;
}
问题是:当我 运行 代码时,似乎函数 PrintNum 在单个线程上只有 运行s
因为控制台输出是
[140180645058368] section 4 started ...
Task size 3
[140180610144000] Started...
[140180626929408] Started...
[140180618536704] Started...
[140180610144000] 1
[140180610144000] 2
[140180610144000] 3
[140180610144000] 4
[140180610144000] 5
[140180610144000] 6
[140180610144000] 7
[140180610144000] 8
[140180610144000] 9
[140180610144000] End
[140180626929408] End
[140180618536704] End
我的问题是,是否需要配置链让任务分散到所有线程?或者也许我在这里错过了什么?
[编辑]
理想情况下,输出应该类似于
[00154F88] The program will exit when all work has finished.
[001532B0] Thread Start
[00154FB0] Thread Start
[001532B0] x: 1
[00154FB0] x: 2
[001532B0] x: 3
[00154FB0] x: 4
[001532B0] x: 5
[00154FB0] Thread Finish
[001532B0] Thread Finish
Press any key to continue . . .
在预期的输出中,线程00154FB0
和001532B0
都执行了PrintNum(),但在修改后的版本中,只有一个线程执行了PrintNum()。
如果未使用链,则输出为:
[140565152012096] section 4 started ...
[140565133883136] Started...
Task size 3
[140565117097728] Started...
[140565125490432] Started...
[[140565133883136] [140565117097728]] 12
3
[140565133883136] [4
[140565117097728140565125490432] 6
] 5
[140565133883136] 7
[140565125490432] 8
[140565117097728] 9
[140565125490432] End
[140565117097728] End
[140565133883136] End
谢谢
这是我正在使用的机器的 cpu 信息
$lscpu
Thread(s) per core: 1
Core(s) per socket: 4
Socket(s): 1
OS是Ubuntu18.04
容
这就是 strand 的目的:
A strand is defined as a strictly sequential invocation of event handlers (i.e. no concurrent invocation). Use of strands allows execution of code in a multithreaded program without the need for explicit locking (e.g. using mutexes).
如果你想要并行调用,你需要删除链,post()
直接到 io_service
并从多个线程调用 io_service::run
(你已经这样做了).
一个不相关的注释:传递唯一指针没有意义;让您的生活更轻松,只需传递原始指针或引用。
这可能有点晚了。但是,我 运行 进入了同样的问题,遵循与上面相同的示例。事实证明,当前使用 strand
的方式有点不同,正如 所暗示的那样。这是我对原始代码的修改:
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <memory>
#include <mutex>
#include <thread>
#include <chrono>
#include <vector>
#include <iostream>
namespace asio = boost::asio;
std::mutex global_stream_lock;
void
worker_thread(std::shared_ptr<asio::io_context> ioc) {
global_stream_lock.lock();
std::cout << "[" << std::this_thread::get_id() << "] Thread start"
<< std::endl;
global_stream_lock.unlock();
ioc->run();
global_stream_lock.lock();
std::cout << "[" << std::this_thread::get_id() << "] Thread finished"
<< std::endl;
global_stream_lock.unlock();
}
void
print_num(int x) {
std::cout << "[" << std::this_thread::get_id() << "] x = " << x
<< std::endl;
}
int
main() {
auto ioc = std::make_shared<asio::io_context>();
auto strand = asio::make_strand(*ioc);
auto work = asio::make_work_guard(*ioc);
global_stream_lock.lock();
std::cout << "[" << std::this_thread::get_id()
<< "] This thread will exit when all work is finished "
<< std::endl;
global_stream_lock.unlock();
std::vector<std::thread> thread_group;
for (int i = 0; i < 4; ++i) {
thread_group.emplace_back(std::bind(worker_thread, ioc));
}
for (int i = 0; i < 4; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
asio::post(strand, std::bind(print_num, 2 * i + 1));
asio::post(strand, std::bind(print_num, 2 * i + 2));
}
work.reset();
for (auto &t : thread_group) {
t.join();
}
}
这会产生以下输出:
[139877509977920] This thread will exit when all work is finished
[139877509973568] Thread start
[139877501580864] Thread start
[139877493188160] Thread start
[139877484795456] Thread start
[139877509973568] x = 1
[139877509973568] x = 2
[139877493188160] x = 3
[139877493188160] x = 4
[139877501580864] x = 5
[139877501580864] x = 6
[139877484795456] x = 7
[139877484795456] x = 8
[139877509973568] Thread finished
[139877493188160] Thread finished
[139877484795456] Thread finished
[139877501580864] Thread finished
我使用来自 4a here
的库的独立版本修改了一个 asio strand 示例#include <iostream>
#include <asio.hpp>
#include <future>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std::chrono_literals;
namespace util
{
static std::mutex s_mtx_print;
// Default argument value
// https://en.cppreference.com/w/cpp/language/default_arguments
template <typename... Args>
void sync_print(const bool log_thread_id, Args &&... args)
{
std::lock_guard<std::mutex> print_lock(s_mtx_print);
if (log_thread_id)
{
std::cout << "[" << std::this_thread::get_id() << "] ";
}
(std::cout << ... << args) << '\n';
}
}
void Worker(std::unique_ptr<asio::io_service> &ios)
{
util::sync_print(true, " Started...");
if(ios) {ios->run();}
util::sync_print(true, " End");
}
void PrintNum(int n)
{
std::cout << "[" << std::this_thread::get_id() << "] " << n << '\n';
std::this_thread::sleep_for(300ms);
}
void OrderedInvocation(std::unique_ptr<asio::io_service::strand> &up_strand)
{
if(up_strand)
{
up_strand->post(std::bind(&PrintNum, 1));
up_strand->post(std::bind(&PrintNum, 2));
up_strand->post(std::bind(&PrintNum, 3));
up_strand->post(std::bind(&PrintNum, 4));
up_strand->post(std::bind(&PrintNum, 5));
up_strand->post(std::bind(&PrintNum, 6));
up_strand->post(std::bind(&PrintNum, 7));
up_strand->post(std::bind(&PrintNum, 8));
up_strand->post(std::bind(&PrintNum, 9));
}
else{
std::cerr << "Invalid strand" << '\n';
}
}
int main()
{
util::sync_print(true, "section 4 started ...");
auto up_ios = std::make_unique<asio::io_service>();
auto up_work = std::make_unique<asio::io_service::work>(*up_ios);
auto up_strand = std::make_unique<asio::io_service::strand>(*up_ios);
std::vector<std::future<void>> tasks;
constexpr int NUM_TASK = 3;
for(int i = 0; i< NUM_TASK; ++i)
{
tasks.push_back(std::async(std::launch::async, &Worker, std::ref(up_ios)));
}
std::cout << "Task size " << tasks.size() << '\n';
std::this_thread::sleep_for(500ms);
OrderedInvocation(up_strand);
up_work.reset();
for(auto &t: tasks){ t.get(); }
return 0;
}
问题是:当我 运行 代码时,似乎函数 PrintNum 在单个线程上只有 运行s
因为控制台输出是
[140180645058368] section 4 started ...
Task size 3
[140180610144000] Started...
[140180626929408] Started...
[140180618536704] Started...
[140180610144000] 1
[140180610144000] 2
[140180610144000] 3
[140180610144000] 4
[140180610144000] 5
[140180610144000] 6
[140180610144000] 7
[140180610144000] 8
[140180610144000] 9
[140180610144000] End
[140180626929408] End
[140180618536704] End
我的问题是,是否需要配置链让任务分散到所有线程?或者也许我在这里错过了什么?
[编辑] 理想情况下,输出应该类似于
[00154F88] The program will exit when all work has finished.
[001532B0] Thread Start
[00154FB0] Thread Start
[001532B0] x: 1
[00154FB0] x: 2
[001532B0] x: 3
[00154FB0] x: 4
[001532B0] x: 5
[00154FB0] Thread Finish
[001532B0] Thread Finish
Press any key to continue . . .
在预期的输出中,线程00154FB0
和001532B0
都执行了PrintNum(),但在修改后的版本中,只有一个线程执行了PrintNum()。
如果未使用链,则输出为:
[140565152012096] section 4 started ...
[140565133883136] Started...
Task size 3
[140565117097728] Started...
[140565125490432] Started...
[[140565133883136] [140565117097728]] 12
3
[140565133883136] [4
[140565117097728140565125490432] 6
] 5
[140565133883136] 7
[140565125490432] 8
[140565117097728] 9
[140565125490432] End
[140565117097728] End
[140565133883136] End
谢谢
这是我正在使用的机器的 cpu 信息
$lscpu
Thread(s) per core: 1
Core(s) per socket: 4
Socket(s): 1
OS是Ubuntu18.04
容
这就是 strand 的目的:
A strand is defined as a strictly sequential invocation of event handlers (i.e. no concurrent invocation). Use of strands allows execution of code in a multithreaded program without the need for explicit locking (e.g. using mutexes).
如果你想要并行调用,你需要删除链,post()
直接到 io_service
并从多个线程调用 io_service::run
(你已经这样做了).
一个不相关的注释:传递唯一指针没有意义;让您的生活更轻松,只需传递原始指针或引用。
这可能有点晚了。但是,我 运行 进入了同样的问题,遵循与上面相同的示例。事实证明,当前使用 strand
的方式有点不同,正如
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <memory>
#include <mutex>
#include <thread>
#include <chrono>
#include <vector>
#include <iostream>
namespace asio = boost::asio;
std::mutex global_stream_lock;
void
worker_thread(std::shared_ptr<asio::io_context> ioc) {
global_stream_lock.lock();
std::cout << "[" << std::this_thread::get_id() << "] Thread start"
<< std::endl;
global_stream_lock.unlock();
ioc->run();
global_stream_lock.lock();
std::cout << "[" << std::this_thread::get_id() << "] Thread finished"
<< std::endl;
global_stream_lock.unlock();
}
void
print_num(int x) {
std::cout << "[" << std::this_thread::get_id() << "] x = " << x
<< std::endl;
}
int
main() {
auto ioc = std::make_shared<asio::io_context>();
auto strand = asio::make_strand(*ioc);
auto work = asio::make_work_guard(*ioc);
global_stream_lock.lock();
std::cout << "[" << std::this_thread::get_id()
<< "] This thread will exit when all work is finished "
<< std::endl;
global_stream_lock.unlock();
std::vector<std::thread> thread_group;
for (int i = 0; i < 4; ++i) {
thread_group.emplace_back(std::bind(worker_thread, ioc));
}
for (int i = 0; i < 4; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
asio::post(strand, std::bind(print_num, 2 * i + 1));
asio::post(strand, std::bind(print_num, 2 * i + 2));
}
work.reset();
for (auto &t : thread_group) {
t.join();
}
}
这会产生以下输出:
[139877509977920] This thread will exit when all work is finished
[139877509973568] Thread start
[139877501580864] Thread start
[139877493188160] Thread start
[139877484795456] Thread start
[139877509973568] x = 1
[139877509973568] x = 2
[139877493188160] x = 3
[139877493188160] x = 4
[139877501580864] x = 5
[139877501580864] x = 6
[139877484795456] x = 7
[139877484795456] x = 8
[139877509973568] Thread finished
[139877493188160] Thread finished
[139877484795456] Thread finished
[139877501580864] Thread finished