boost::asio 如何从客户端发送字符串数据并将其存储在服务器端进行处理

boost::asio how to send string data from client and store it at server side for processing

我正在尝试使用 boost::asio 库从多个客户端发送原始字符串。我需要在服务器端对字符串进行排序并将其发送回客户端。

void start()
  {
    sock.async_read_some(
        boost::asio::buffer(data),
        boost::bind(&con_handler::handle_read,
                    shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));

    
    sock.async_write_some(
    boost::asio::buffer(data),
        boost::bind(&con_handler::handle_write, 
            shared_from_this(),
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
  }

  void handle_read(const boost::system::error_code& err,
                   size_t bytes_transferred)
  {
      std::sort(data.begin(), data.end());
  }

  void handle_write(const boost::system::error_code& err,
               size_t bytes_transferred)
  {
    if (!err) {
    cout<<"server side "<<data<<endl;
    
    } else {
      std::cerr << "err (recv): " << err.message() << std::endl;
      sock.close();
    }
  }

但是,这不起作用。我在打印和客户端上得到空字符串。

请问如何解决这个问题?

缺少很多相关代码,但是

std::sort(data.begin(), data.end());

告诉我 data 可能是 array<char, N>std::vector<char>

It cannot be std::string because you'd have needed asio::dynamic_buffer(data) instead of asio::buffer(data).

现在

  • 如果使用 array<char, N>,那么 end() 迭代器将始终是缓冲区的末尾,无论接收到多少字节。这不是您想要的,因为缓冲区将包含尾随垃圾 - and/or 可能是 NUL 字符,这会弄乱您的输出。

  • 如果使用 vector<char>,那么经典的错误是它的大小为零(它是 empty())所以 buffer(data) 实际上是一个零字节缓冲区并且什么也没有收到,排序什么也不做,什么也没有发送。

无论如何,sort 调用并未执行您描述的操作:

I am trying to use boost::asio library to send raw strings from multiple clients. I need to sort the strings at server side and send it back to clients.

这表明您对单独的“字符串”(可能是单独的文本行?)进行排序。您的代码对字符串中的 个字符 进行排序。不是一回事。

这是一个解决方案的粗略草图,其中服务器从多个客户端累积 _strings(按行),并在连接结束时将排序后的集合写回每个客户端:

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>
#include <deque>

namespace net = boost::asio;
using namespace std::chrono_literals;
using net::ip::tcp;
using boost::system::error_code;

using Executor = net::io_context::executor_type;
using Acceptor = net::basic_socket_acceptor<tcp, Executor>;
using Socket   = net::basic_stream_socket<tcp, Executor>;

struct Server {
    Acceptor _acc;
    Server(Executor ex) : _acc(ex, {{}, 7878}) {
        _acc.listen();
        do_accept();
    }

    void addString(std::string line) {
        std::cerr << "addString: " << std::quoted(line) << std::endl;
        line += '\n';
        _strings.push_back(std::move(line));
    }

    auto getStrings() const { return _strings; }

  private:
    void do_accept()
    {
        _acc.async_accept([this](error_code ec, Socket&& s) {
            std::cerr << "do_accept: " << ec.message() << std::endl;
            if (!ec.failed()) {
                std::cerr << "Accepted " << s.remote_endpoint() << std::endl;
                std::make_shared<Session>(std::move(s), *this)->start();

                do_accept();
            }
        });
    }

    struct Session : std::enable_shared_from_this<Session> {
        Session(Socket&& s, Server& server)
            : _socket(std::move(s))
            , _server(server)
        {
        }

        void start() {
            read_loop();
        }

      private:
        void read_loop()
        {
            async_read_until(_socket, _buf, "\n",
                 [self = this->shared_from_this(),
                  this](error_code ec, size_t bytes) {
                     std::cerr << "read_loop: " << ec.message() << std::endl;
                     if (!ec || ec == net::error::eof) {
                         std::istream is(&_buf);
                         std::string  line;
                         if (getline(is, line)) {
                             _server.addString(std::move(line));
                         }
                     }

                     if (ec == net::error ::eof) {
                         _reply = _server.getStrings();
                         std::sort(_reply.begin(), _reply.end());
                         write_loop();
                     }

                     if (!ec) {
                         read_loop();
                     }
                 });
        }

        void write_loop() {
            if (_reply.empty())
                return;

            async_write(
                _socket, net::buffer(_reply.front()),
                [this, self = shared_from_this()](error_code ec, size_t) {
                    std::cerr << "write_loop: " << ec.message() << std::endl;
                    if (!ec) {
                        _reply.pop_front();
                        write_loop();
                    }
                });
        }

        Socket                  _socket;
        net::streambuf          _buf;
        Server&                 _server;
        std::deque<std::string> _reply;
    };

    std::deque<std::string> _strings;
};

int main() {
    net::io_context io;

    Server s(io.get_executor());

    io.run_for(10s);
}

样本客户:

for a in foo 'bar\nqux' xyz abc
do printf "=== %s\n" "$a"
    echo -e "$a" | netcat -N 127.0.0.1 7878
done

版画

do_accept: Success
Accepted 127.0.0.1:34582
read_loop: End of file
addString: "foo"
write_loop: Success
do_accept: Success
Accepted 127.0.0.1:34584
read_loop: Success
addString: "bar"
read_loop: End of file
addString: "qux"
write_loop: Success
write_loop: Success
write_loop: Success
do_accept: Success
Accepted 127.0.0.1:34586
read_loop: End of file
addString: "xyz"
write_loop: Success
write_loop: Success
write_loop: Success
write_loop: Success
do_accept: Success
Accepted 127.0.0.1:34588
read_loop: End of file
addString: "abc"
write_loop: Success
write_loop: Success
write_loop: Success
write_loop: Success
write_loop: Success

客户端输出:

=== foo
foo
=== bar\nqux
bar
foo
qux
=== xyz
bar
foo
qux
xyz
=== abc
abc
bar
foo
qux
xyz