如何使用boost::beast,下载文件无阻塞且有响应

How to use boost::beast, download a file no blocking and with responses

我从 this example 开始,所以不会 post 所有代码。我的 objective 是在不阻塞我的主线程的情况下下载一个大文件。第二个 objective 是获取通知,以便我可以更新进度条。我确实让代码以几种方式工作。首先是 ioc.run(); 让它开始工作,我下载了文件。但是无论如何我都找不到在不阻塞的情况下启动会话。

第二种方法我可以将呼叫降低到 http::async_read_some 并且呼叫有效但我无法得到我可以使用的响应。我不知道是否有办法传递捕获的 lambda。

#if 0..#else..#endif 切换方法。我确定有一个简单的方法,但我看不到它。当我让它工作时,我会清理代码,比如设置本地文件名。谢谢。

    std::size_t on_read_some(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        if (ec);//deal with it... 
        if (!bValidConnection) {
            std::string_view view((const char*)buffer_.data().data(), bytes_transferred);
            auto pos = view.find("Content-Length:");
            if (pos == std::string_view::npos)
                ;//error
            file_size = std::stoi(view.substr(pos+sizeof("Content-Length:")).data());
            if (!file_size)
                ;//error
            bValidConnection = true;
        }
        else {
            file_pos += bytes_transferred;
            response_call(ec, file_pos);
        }
#if 0
        std::cout << "in on_read_some caller\n";
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            response_call,
            std::placeholders::_1,
            std::placeholders::_2));
#else
        std::cout << "in on_read_some inner\n";
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            &session::on_read_some,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
#endif
        return buffer_.size();
    }

主要的,乱七八糟的但是.....

struct lambda_type {
    bool bDone = false;
    void operator ()(const boost::system::error_code ec, std::size_t bytes_transferred) {
        ;
    }
};
int main(int argc, char** argv)
{
    auto const host = "reserveanalyst.com";
    auto const port = "443";
    auto const target = "/downloads/demo.msi";
    int version = argc == 5 && !std::strcmp("1.0", argv[4]) ? 10 : 11;

    boost::asio::io_context ioc;
    ssl::context ctx{ ssl::context::sslv23_client };

    load_root_certificates(ctx);
    //ctx.load_verify_file("ca.pem");

    auto so = std::make_shared<session>(ioc, ctx);
    so->run(host, port, target, version);

    bool bDone = false;
    auto const lambda = [](const boost::system::error_code ec, std::size_t bytes_transferred) {
        std::cout << "data lambda bytes: " << bytes_transferred << " er: " << ec.message() << std::endl;
    };

    lambda_type lambda2;
    so->set_response_call(lambda);
    ioc.run();

    std::cout << "not in ioc.run()!!!!!!!!" << std::endl;

    so->async_read_some(lambda);

    //pseudo message pump when working.........
    for (;;) {
        std::this_thread::sleep_for(250ms);
        std::cout << "time" << std::endl;
    }
    return EXIT_SUCCESS;
}

以及我添加到 class session

的内容
class session : public std::enable_shared_from_this<session>
{
        using response_call_type = void(*)(boost::system::error_code ec, std::size_t bytes_transferred);
        http::response_parser<http::file_body> file_parser_;
        response_call_type response_call;
        //
        bool bValidConnection = false;
        std::size_t file_pos = 0;
        std::size_t file_size = 0;
    
    public:
        auto& get_result() { return res_; }
        auto& get_buffer() { return buffer_; }
        void set_response_call(response_call_type the_call) { response_call = the_call; }

我强烈建议不要使用 low-level [async_]read_some 函数,而应按预期使用 http::[async_]read http::response_parser<http::buffer_body>

我确实有一个这样的例子 - 由于它还使用 Boost Process 同时解压缩正文数据,这有点复杂,但无论如何它应该告诉你如何使用它:

我想如果有更完整的代码,我可以根据您的具体示例对其进行调整,但也许上面的代码已经足够好了?另请参阅 libs/beast/example/doc/http_examples.hpp 中的“中继 HTTP 消息”,我将其用作“灵感”。

Caution: the buffer arithmetic is not intuitive. I think this is unfortunate and should not have been necessary, so pay (very) close attention to these samples for exactly how that's done.

这是我最终想出的用于带有消息泵的应用程序的方法。我正在为我的应用程序使用 MFC。对于像我这样的 asio 新手,这是必看的视频。

CppCon 2016 Michael Caisse Asynchronous IO with BoostAsio

有几种方法可以做到这一点 运行。有一个用于打开 non-blocking 的定义。这是针对下载大文件并显示带有取消按钮的进度对话框的情况。要启用取消按钮,请将 bool quit 设置为 true。评论 #define NO_BLOCKING 在消息泵等待时下载一个小文件。

我认为我使用std::thread reader_thread;的方式在这个应用中是合适的。我一次不会下载多个文件。我已经开始将其插入我的应用程序,一切看起来都不错。

关于传递 lambda 的问题,@Yakk - Adam Nevraumont 非常有帮助。阅读 his answer here 可以更清楚地了解使用带捕获的 lambda。

如果删除了与 libcripto 和 libssl 的链接,此代码应该可以编译并且 运行 没问题。我正在使用 libcripto-3 这是 root_certificates.hpp 的副本。我检查了一下,这个版本工作正常。

完整代码。

// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
// Official repository: https://github.com/boostorg/beast
// Example: HTTP SSL client, asynchronous downloads

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <fstream>
#include <memory>
#include <string>
#include <chrono>
#include <thread>

//don't need the cert in a file method or use 
    //don't need the cert in a file method or use 
#include "root_certificates.hpp"
#pragma comment(lib, "C:\cpp\openssl-master\libcrypto.lib")
#pragma comment(lib, "C:\cpp\openssl-master\libssl.lib")

using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>
namespace ssl = boost::asio::ssl;       // from <boost/asio/ssl.hpp>
namespace http = boost::beast::http;    // from <boost/beast/http.hpp>

void session_fail(boost::system::error_code ec, char const* what) {
    std::cerr << what << ": " << ec.message() << "\n";
}

class session : public std::enable_shared_from_this<session>
{
public:
    enum responses {
        resp_null,
        resp_ok,
        resp_done,
    };
    using response_call_type = std::function< void(responses, std::size_t)>;
protected:
    tcp::resolver resolver_;
    ssl::stream<tcp::socket> stream_;
    boost::beast::flat_buffer buffer_; // (Must persist between reads)
    http::request<http::empty_body> req_;
    http::response<http::string_body> res_;
    boost::beast::http::request_parser<boost::beast::http::string_body> header_parser_;
    http::response_parser<http::file_body> file_parser_;
    response_call_type response_call;
    boost::system::error_code file_open_ec;
    //
    std::size_t file_pos = 0;
    std::size_t file_size = 0;

public:
    explicit session(boost::asio::io_context& ioc, ssl::context& ctx, const char* filename)
        : resolver_(ioc)
        , stream_(ioc, ctx)
    {
        file_parser_.body_limit((std::numeric_limits<std::uint64_t>::max)());
        file_parser_.get().body().open(filename, boost::beast::file_mode::write, file_open_ec);
    }
    void run(char const* host, char const* port, char const* target, int version)
    {
        std::cout << "run" << std::endl;
        if (!SSL_set_tlsext_host_name(stream_.native_handle(), host))
        {
            boost::system::error_code ec{ static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category() };
            std::cerr << ec.message() << "\n";
            return;
        }
        // Set up an HTTP GET request message
        req_.version(version);
        req_.method(http::verb::get);
        req_.target(target);
        req_.set(http::field::host, host);
        req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);

        // Look up the domain name
        resolver_.async_resolve(host, port, std::bind(
            &session::on_resolve,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
    }
    void on_resolve(boost::system::error_code ec, tcp::resolver::results_type results)
    {
        std::cout << "on_resolve" << std::endl;
        if (ec)
            return session_fail(ec, "resolve");

        // Make the connection on the IP address we get from a lookup
        boost::asio::async_connect(stream_.next_layer(), results.begin(), results.end(), std::bind(
            &session::on_connect,
            shared_from_this(),
            std::placeholders::_1));
    }
    void on_connect(boost::system::error_code ec)
    {
        std::cout << "on_connect" << std::endl;
        if (ec)
            return session_fail(ec, "connect");

        // Perform the SSL handshake
        stream_.async_handshake(ssl::stream_base::client, std::bind(
            &session::on_handshake,
            shared_from_this(),
            std::placeholders::_1));
    }
    void on_handshake(boost::system::error_code ec)
    {
        std::cout << "on_handshake" << std::endl;
        if (ec)
            return session_fail(ec, "handshake");

        // Send the HTTP request to the remote host
        http::async_write(stream_, req_, std::bind(
            &session::on_write,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
    }
    void on_write(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        std::cout << "on_write" << std::endl;
        if (ec)
            return session_fail(ec, "write");
        if (response_call)
            http::async_read_header(stream_, buffer_, file_parser_, std::bind(
                &session::on_startup,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
        else
            http::async_read_header(stream_, buffer_, file_parser_, std::bind(
                &session::on_read,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
    }
    std::size_t on_startup(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        std::cout << "on_startup: " << bytes_transferred << std::endl;
        std::string_view view((const char*)buffer_.data().data(), bytes_transferred);
        auto pos = view.find("Content-Length:");
        if (pos == std::string_view::npos)
            assert(true);//error
        file_size = std::stoi(view.substr(pos + sizeof("Content-Length:")).data());
        if (!file_size)
            assert(true);//error
        std::cout << "filesize: " << file_size << std::endl;
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            &session::on_read_some,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
        return buffer_.size();
    }
    std::size_t on_read_some(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        //std::cout << "on_read_some" << std::endl;
        if (ec) {
            session_fail(ec, "on_read_some");
            return 0;
        }
        file_pos += bytes_transferred;
        if (!bytes_transferred && file_pos) {
            on_shutdown(ec);
            return 0;
        }
        response_call(resp_ok, file_pos);

        //std::cout << "session::on_read_some: " << file_pos << std::endl;
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            &session::on_read_some,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
        return buffer_.size();
    }
    std::size_t on_read(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        file_pos += bytes_transferred;
        if (!bytes_transferred && file_pos) {
            on_shutdown(ec);
            return 0;
        }
        std::cout << "on_read: " << bytes_transferred << std::endl;
        http::async_read(stream_, buffer_, file_parser_,
            std::bind(&session::on_read,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
        return buffer_.size();
    }
    void on_shutdown(boost::system::error_code ec)
    {
        std::cout << "on_shutdown" << std::endl;
        if (ec == boost::asio::error::eof) {
            // Rationale:
            // 
            ec.assign(0, ec.category());
        }
        if (response_call)
            response_call(resp_done, 0);
        if (ec)
            return session_fail(ec, "shutdown");
    }
    auto get_file_status() const { return file_open_ec; }
    void set_response_call(response_call_type the_call) { response_call = the_call; }
    std::size_t get_download_size() const { return file_size; }
    //std::string get_content() const { return "test"; }// return response;}
};

#define NO_BLOCKING
int main(int argc, char** argv)
{
    //in a UI app you will need to keep a persistant thread/pool;
    std::thread reader_thread;
    //for an application where this never changes, this can just be put in the session class
    auto const host = "reserveanalyst.com";
    auto const port = "443";
#ifdef NO_BLOCKING // the large file
    auto const target = "/downloads/demo.msi";
#else // the small file
    auto const target = "/server.xml";
#endif
    boost::asio::io_context ioc;
    ssl::context ctx{ ssl::context::sslv23_client };
    load_root_certificates(ctx);
    //end, put in the session class
    auto so = std::make_shared<session>(ioc, ctx, "content.txt");//may be big binary
    so->run(host, port, target, 11);//so->run(target);
    //
    session::responses glb_response = session::resp_null;
    bool test_bool = false; //stand in for 'SendMessage' values
    std::size_t buf_size = 0; //stand in for 'SendMessage' values
#ifdef NO_BLOCKING
    auto static const lambda = [&glb_response, &buf_size](session::responses response, std::size_t bytes_transferred) {
        glb_response = response;
        buf_size = bytes_transferred;
        //drive your progress bar from here in a GUI app
        //sizes = the_beast_object.get_file_size() - size;//because size is what is left
        //cDownloadProgreess.SetPos((LPARAM)(sizes * 100 / the_beast_object.get_file_size()));
    };
    so->set_response_call(lambda);
#else
    ioc.run();
    std::cout << "ioc run exited" << std::endl;
#endif

#ifdef NO_BLOCKING
//    reader_thread.swap(std::thread{ [&ioc]() { ioc.run(); } });
    std::thread new_thread{ [&ioc]() { ioc.run(); } };
    reader_thread.swap(new_thread);
#endif
    bool quit = false; //true: as if a cancel button was pushed; won't finish download
    //pseudo message pump
    for (int i = 0; ; ++i) {

        switch (glb_response) { //ad hoc as if messaged
        case session::responses::resp_ok:
            std::cout << "from sendmessage: " << buf_size << std::endl;
            break;
        case session::responses::resp_done:
            std::cout << "from sendmessage: done" << std::endl;
        }//switch
        glb_response = session::responses::resp_null;
        if (!(i % 10))
            std::cout << "in message pump, stopped: " << std::boolalpha << ioc.stopped() << std::endl;

        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        if (quit && i == 10) //the cancel message
            ioc.stop();
        if (ioc.stopped())//just quit to test join.
            break;
    }
    if (reader_thread.joinable())//in the case a thread was never started
        reader_thread.join();
    std::cout << "exiting, program was quit" << std::endl;

    return EXIT_SUCCESS;
}