std::future 在 Boost UDP 套接字异步接收操作中不起作用

std::future not working in Boost UDP socket async receive operation

我正在用 Boost 编写一个 UDP 服务器应用程序,它应该在套接字上侦听 5 秒,如果在这 5 秒内没有收到数据报,则继续做其他事情。

受到的启发,我决定尝试基于std::future的解决方案。

问题是对wait_for()的调用总是超时,就好像没有收到数据一样。但是,如果我在超时后执行的行上设置断点并检查变量,我会看到缓冲区包含接收到的数据报,并且 remote_endpoint 对象包含客户端的地址。换句话说,套接字接收按预期工作,但 std::future 不会触发。为什么?

这是我的测试服务器代码:

#include <future>
#include <boost/asio.hpp>
#include <boost/asio/use_future.hpp>

using boost::asio::ip::udp;

int main()
{
    try
    {
        boost::asio::io_service io_service;
        udp::socket socket(io_service, udp::endpoint(udp::v4(), 10000));
        char recv_buf[8];

        for (;;)
        {
            ZeroMemory(recv_buf, 8);
            udp::endpoint remote_endpoint;
            std::future<std::size_t> recv_length;

            recv_length = socket.async_receive_from(
                boost::asio::buffer(recv_buf), 
                remote_endpoint, 
                0, 
                boost::asio::use_future);

            if (recv_length.wait_for(
                std::chrono::seconds(5)) == std::future_status::timeout)
            {
                printf("time out. Nothing received.\n");
            }
            else
            {
                printf("received something: %s\n", recv_buf);
            }
        }
    }
    catch (std::exception& e)
    {
        printf("Error: %s\n", e.what());
    }
    return 0;
}

我已经为此苦苦思索了一段时间,如有任何帮助,我们将不胜感激。我在 Windows 10 和 Visual Studio 2015.

这是我的测试客户端代码(在 python 中,抱歉)。

import socket
import time

HOST = "server"           # The remote host
PORT = 10000              # The same port as used by the server
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
    address = socket.getaddrinfo(HOST, PORT)[0][-1]

    while True:
        s.sendto("ping[=12=]", address)
        time.sleep(1)

您没有调用 io_service 对象的 run 方法。因此 asio 不是 运行。请创建一个调用 run 方法的线程,然后重试。

你正在做的是异步和同步操作的混合,这是行不通的:

  • 您正在使用异步 async_receive_from 操作,它将在 asio 事件循环 (io_service) 上 运行 并在收到内容时完成。在正常情况下,这将在完成时调用回调,如果你给它未来,它将完成未来。请注意,这将发生在调用 io_service.run()
  • 的线程中
  • 您正在以同步方式使用未来。这将阻塞当前线程,直到 future 完成。
  • 如果未来是从同一个线程中实现的,而不是你为了等待它而阻塞的线程,那么它显然永远无法实现。

解决此问题的可能步骤:

  • 只需使用带有超时的 asio 阻塞操作。这些正是您希望在未来实现的事情。
  • 使用 futures then() 方法来附加延续而不是阻塞。不适用于旧的 stdlib futures,因为它是 C++17 扩展。但是boost期货可以做到。您仍然需要在主线程上调用 io_service.run() 并将您的程序拆分为回调前后的阶段。
  • 运行 asio 和它在后台线程中的事件循环,如果你需要的话

对于异步操作,底层 I/O 和完成处理程序的执行是离散的步骤。在这种情况下,I/O 已完成,但用户代码从未 运行 发送 io_service,因此永远不会执行设置 recv_length 值的完成处理程序。要解决此问题,运行 io_service.


有一些细节有助于观察:

  • 当启动异步操作时,如果它可以无阻塞地完成,那么它将这样做并且它的完成处理程序将排队进入 io_service as-if通过 io_service.post()
  • 使用 boost::asio::use_future 时,std::future 的值在异步操作的完成处理程序中设置
  • 发布到 io_service 的处理程序仅在当前调用 poll()poll_one()run()run_one() 成员函数的线程中调用在 io_service

在问题的上下文中,当

recv_length = socket.async_receive_from(
  boost::asio::buffer(recv_buf), 
  remote_endpoint, 
  0, 
  boost::asio::use_future);

已启动并且数据可供读取 (socket.available() > 0),然后 remote_endpointrecv_buffer 都将在启动 async_receive_from() 中填充正确的数据功能。将设置 recv_length 值的完成处理程序发布到 io_service。但是,由于代码不处理 io_service,因此永远不会设置 recv_length 的值。因此,recv_length.wait_for() 将始终导致超时状态。


official futures example 创建一个额外的线程,专用于处理 I/O 服务,并在未处理 I/O 服务的线程中等待 std::future :

// We run the io_service off in its own thread so that it operates
// completely asynchronously with respect to the rest of the program.
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
std::thread thread([&io_service](){ io_service.run(); });

...

std::future<std::size_t> send_length =
  socket.async_send_to(..., boost::asio::use_future);

// Do other things here while the send completes.

send_length.get(); // Blocks until the send is complete. Throws any errors.

io_service.stop();
thread.join();

我找到了解决办法。因此,总结一下,这是需要做的。我的初始代码需要修改 2 次。

(1) 在开头添加 2 行以启动一个单独的线程 io_service 以监视超时(如 Tanner Sansbury 所建议)

boost::asio::io_service::work work(io_service);
std::thread thread([&io_service](){ io_service.run(); });

(2) 在sockettime_out的条件下调用socket.cancel();。如果不取消套接字操作,尽管重新调用 wait_for()(在 Boost 的邮件列表中收到的解决方案),套接字将继续阻塞。

修改后的代码供参考:

#include <future>
#include <boost/asio.hpp>
#include <boost/asio/use_future.hpp>

using boost::asio::ip::udp;

int main()
{
    try
    {
        boost::asio::io_service io_service;
        boost::asio::io_service::work work(io_service);
        std::thread thread([&io_service](){ io_service.run(); });

        udp::socket socket(io_service, udp::endpoint(udp::v4(), 10000));

        char recv_buf[8];

        for (;;)
        {
            ZeroMemory(recv_buf, 8);
            udp::endpoint remote_endpoint;
            std::future<std::size_t> recv_length;

            recv_length = socket.async_receive_from(
                boost::asio::buffer(recv_buf), 
                remote_endpoint, 
                0, 
                boost::asio::use_future);

            if (recv_length.wait_for(
                std::chrono::seconds(5)) == std::future_status::timeout)
            {
                printf("time out. Nothing received.\n");
                socket.cancel();
            }
            else
            {
                printf("received something: %s\n", recv_buf);
            }
        }
    }
    catch (std::exception& e)
    {
        printf("Error: %s\n", e.what());
    }
    return 0;
}

谢谢大家的帮助。