Boost:asio 和多线程中的异步

Boost:asio and async in multi-threading

我需要调用方法,这是对远程服务器的请求。之后我想等待一个答案,等待不会被其他异步阻塞function/objects(例如定时器)。

方法got_response(...) 告诉用户他从远程服务器得到了答案,方法还获取了我们作为答案得到的条目数据。下面我得到了我的解决方案,但有时可以在单线程中调用计时器,这将导致方法 got_response() 挂起。

如何在其他线程中调用定时器来保证答案模拟。我的问题还有其他解决方案吗?

#include <iostream>
#include <boost/asio.hpp>
#include <future>
#include <thread>
#include <vector>
using namespace std;

namespace io = boost::asio;

struct Reply
{
    atomic<bool> ready;
    atomic<int> result;

    future<void> future_result;

    Reply()
    {
        ready = false;
        result = 0;
    }

    void call()
    {
        cout << "retry called!" << endl;
        future_result = async([&]()
                              {
                                  while (!ready)
                                  {
                                      this_thread::yield();
                                  }
                              });
    }

    int get()
    {
        future_result.wait();
        return result.load();
    }

    void got_response(int res)
    {
        result = res;
        ready = true;
    }
};

int main()
{
    Reply reply;
    reply.call();

    io::io_context context(4);

    io::steady_timer timer1(context, std::chrono::seconds(2));
    timer1.async_wait([&](const boost::system::error_code &ec)
                      { cout << "timer 1, thread name: " << this_thread::get_id() << endl; });

    io::steady_timer timer2(context, std::chrono::seconds(3));
    timer2.async_wait([&](const boost::system::error_code &ec)
                      {
                          cout << "timer 2, thread name: " << this_thread::get_id() << endl;
                          cout << reply.get() << endl;
                      });

    io::steady_timer timer3(context, std::chrono::seconds(10));
    timer3.async_wait([&](const boost::system::error_code &ec)
                      {
                          cout << "timer 3, thread name: " << this_thread::get_id() << endl;
                          reply.got_response(1337);
                      });

    vector<thread> threads;
    auto count = 2;

    for (int n = 0; n < count; ++n)
    {
        threads.emplace_back([&]
                             { context.run(); });
    }

    for (auto &th : threads)
    {
        th.join();
    }
}

结果:

retry called!
timer 1, thread name: 140712511198784
timer 2, thread name: 140712519591488
timer 3, thread name: 140712511198784
1337

哇。这在几个层面上过于复杂。

  • futures 可以输入 return 值(这实际上是 future 在同步原语上的全部意义)

  • fturue 可以用值表示就绪,无需将就绪复制到bool,然后将结果复制到某处

  • 这让我很困惑:

     int get()
     {
         _fut.wait();
         return _result.load();
     }
    

    等待未来,然后 return 就是 _result,你知道你发明 _ready 的目的是什么吗?

  • 你知道 std::async 不是 Boost ASIO 的一部分吗?事实上,它不能很好地工作,因为正如您正确注意到的那样,它引入了(未指定数量的)线程。总的来说,我的建议是不要使用 std::async(很难正确使用),当然也不要使用 ASIO

  • 当您看到相同的变量名称 var1、var2、var3 时,是时候重构您的代码了(如果它包含数据成员,则重构为函数或 类):

     std::deque<io::steady_timer> timers;
    
     for (int i = 1; i <= 3; ++i) {
         auto& timer = timers.emplace_back(context, std::chrono::seconds(1+i));
         timer.async_wait([i](error_code ec) {
             std::cout << "timer " << i
                       << ", thread name: " << std::this_thread::get_id()
                       << std::endl;
         });
     }
    
  • 而不是线程向量,考虑 boost::thread_group 或者 boost::asio::thread_pool.

  • 如果手动运行IO线程,记得处理异常(Should the exception thrown by boost::asio::io_service::run() be caught?),所以

     boost::thread_group threads;
     for (int n = 0; n < 2; ++n) {
         threads.create_thread([&] { context.run(); });
     }
    
     threads.join_all();
    

    或者确实

     io::thread_pool context(2);
     context.join();
    
  • 这是非常效率低下

     while (!_ready) {
         std::this_thread::yield();
     }
    

    只需设置未来值以表示它已准备就绪:

  • using namespace std 通常不是一个好主意 (Why is "using namespace std;" considered bad practice?)

演示

这是我对问题代码的扩展但简化的看法:

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <future>
#include <iostream>
#include <thread>
namespace io = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;

// not very useful in practice, but for debug output in main
std::ostream& debug(error_code);
template <typename Fut> bool is_ready(Fut const& fut) {
    return fut.wait_for(0s) == std::future_status::ready;
}

int main() {
    std::promise<int>  reply;
    std::shared_future got_value = reply.get_future();

    io::thread_pool              context(2);
    std::deque<io::steady_timer> timers;

    for (int i = 1; i <= 10; ++i) {
        timers //
            .emplace_back(context, i * 1s)
            .async_wait([&got_value](error_code ec) {
                if (is_ready(got_value))
                    debug(ec) << " Reply:" << got_value.get() << std::endl;
                else
                    debug(ec) << " (reply not ready)" << std::endl;
            });
    }

    timers //
        .emplace_back(context, 4'500ms)
        .async_wait([&reply](error_code ec) {
            debug(ec) << " setting value" << std::endl;
            reply.set_value(1337);
        });

    context.join();
}
    
int friendly_thread_id() {
    return std::hash<std::thread::id>{}(std::this_thread::get_id()) % 256;
}

#include <iomanip>
std::ostream& debug(error_code ec) {
    auto        now           = std::chrono::system_clock::now;
    static auto program_start = now();
    return std::cout //
        << ((now() - program_start) / 1ms) << "ms\t"
        << "thread:" << std::hex << std::setfill('0') << std::showbase
        << std::setw(2) << friendly_thread_id() << std::dec << " ";
}


#include <iomanip>
std::ostream& debug(error_code ec) {
    auto        now           = std::chrono::system_clock::now;
    static auto program_start = now();
    return std::cout //
        << ((now() - program_start) / 1ms) << "ms\t"
        << "thread:" << std::hex << std::setfill('0') << std::showbase
        << std::setw(2) << pretty_thread_id() << std::dec << " ";
}

版画

0ms     thread:0x5f  (reply not ready)
999ms   thread:0xf3  (reply not ready)
1999ms  thread:0x5f  (reply not ready)
2999ms  thread:0x5f  (reply not ready)
3499ms  thread:0xf3  setting value
3999ms  thread:0x5f  Reply:1337
4999ms  thread:0xf3  Reply:1337
5999ms  thread:0xf3  Reply:1337
6999ms  thread:0xf3  Reply:1337
7999ms  thread:0xf3  Reply:1337
8999ms  thread:0xf3  Reply:1337