Boost asio:无法确认文件传输
Boost asio: unable to acknowledge a file transfer
我正在使用 boost asio 通过 TCP 执行文件传输。文件传输有效,但是当我决定通过链接 async_write
(在服务器上)和 async_read_until
(在客户端上)来实现从服务器到客户端的简单确认消息时,我观察到一个奇怪的行为:文件是服务器端不再正确接收。在传输结束前几百个字节,服务器不再接收任何字节,因此永远不会调用 async_write
负责确认文件传输。
当我写完文件后在客户端调用 async_read_until
时似乎会发生这种情况。由于某种原因,它影响了当前的文件传输。
客户端实现:
#include "StdAfx.h"
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread.hpp>
#include "AsyncTCPClient.h"
AsyncTCPClient::AsyncTCPClient(boost::asio::io_service& iIoService, const std::string& iServerIP, const std::string& iPath)
: mResolver(iIoService), mSocket(iIoService)
{
size_t wPos = iServerIP.find(':');
if(wPos==std::string::npos)
{
return;
}
std::string wPortStr = iServerIP.substr(wPos + 1);
std::string wServerIP = iServerIP.substr(0, wPos);
mSourceFile.open(iPath, std::ios_base::binary | std::ios_base::ate);
if(!mSourceFile)
{
LOG(LOGERROR) << "Failed to open file: " << iPath;
return;
}
size_t wFileSize = mSourceFile.tellg();
mSourceFile.seekg(0);
std::ostream wRequestStream(&mRequest);
wRequestStream << iPath << "\n" << wFileSize << "\n\n";
LOG(LOGINFO) << "File to transfer: " << iPath;
LOG(LOGINFO) << "Filesize: " << wFileSize << " bytes";
tcp::resolver::query wQuery(wServerIP, wPortStr);
mResolver.async_resolve(wQuery, boost::bind(&AsyncTCPClient::HandleResolve, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator));
}
AsyncTCPClient::~AsyncTCPClient()
{
}
void AsyncTCPClient::HandleResolve(const boost::system::error_code & iErr, tcp::resolver::iterator iEndpointIterator)
{
if(!iErr)
{
tcp::endpoint wEndpoint = *iEndpointIterator;
mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator));
}
else
{
LOG(LOGERROR) << "Error: " << iErr.message();
}
}
void AsyncTCPClient::HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator)
{
if(!iErr)
{
boost::asio::async_write(mSocket, mRequest, boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
}
else if(iEndpointIterator != tcp::resolver::iterator())
{
mSocket.close();
tcp::endpoint wEndpoint = *iEndpointIterator;
mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator));
}
else
{
LOG(LOGERROR) << "Error: " << iErr.message();
}
}
void AsyncTCPClient::HandleWriteFile(const boost::system::error_code& iErr)
{
if(!iErr)
{
if(mSourceFile)
{
mSourceFile.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
// EOF reached
if(mSourceFile.gcount() <= 0)
{
return;
}
//LOG(LOGTRACE) << "Send " << mSourceFile.gcount() << "bytes, total: " << mSourceFile.tellg() << " bytes.\n";
boost::asio::async_write(mSocket, boost::asio::buffer(mBuffer.c_array(), mSourceFile.gcount()), boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
}
else
{
LOG(LOGINFO) << "File transfer done";
/// async_read responsible for receiving a simple "ack[;]" once server is done receiving
/// when I don't do this and simply return the server receives the file properly and sends the ack
/// when I do this the server stops never receives the full file and simply waits for all the bytes to arrive which doesn't happen
boost::asio::async_read_until(mSocket, mRecBuf, "[;]", boost::bind(&AsyncTCPClient::HandleReceiveAcknowledge, this, boost::asio::placeholders::error));
}
}
else
{
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
void AsyncTCPClient::HandleReceiveAcknowledge(const boost::system::error_code& iErr)
{
if(!iErr)
{
std::string wRecData((std::istreambuf_iterator<char>(&mRecBuf)), std::istreambuf_iterator<char>());
LOG(LOGDEBUG1) << "Acknowledged this data: " << wRecData;
return;
}
else
{
// in case of error free resources and bail
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
服务器实现:
#include "StdAfx.h"
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <fstream>
#include <boost/enable_shared_from_this.hpp>
#include "AsyncTCPClient.h"
#include "AsyncTCPServer.h"
#include "Debug.h"
AsyncTCPServer::AsyncTCPServer(unsigned short iPort, const std::string iFilePath)
:mAcceptor(mIoService, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort), true)
{
mAsyncTCPConnectionPtr wNewConnection(new AsyncTCPConnection(mIoService, iFilePath));
mAcceptor.async_accept(wNewConnection->Socket(), boost::bind(&AsyncTCPServer::HandleAccept, this, wNewConnection, boost::asio::placeholders::error));
mIoService.run();
}
AsyncTCPServer::~AsyncTCPServer()
{
mIoService.stop();
}
void AsyncTCPServer::HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code& iErr)
{
if (!iErr)
{
iCurConnection->Start();
}
else
{
BIOLOG(BioSans::LOGERROR) << " " << iErr << ", " << iErr.message();
}
}
连接实现:
#include "StdAfx.h"
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <fstream>
#include "Debug.h"
#include "AsyncTCPConnection.h"
AsyncTCPConnection::AsyncTCPConnection(boost::asio::io_service& iIoService, const std::string iFilePath)
: mSocket(iIoService), mFileSize(0), mFilePath(iFilePath)
{
}
AsyncTCPConnection::~AsyncTCPConnection()
{
}
void AsyncTCPConnection::Start()
{
LOG(LOGINFO) << "Start";
async_read_until(mSocket, mRequestBuffer, "\n\n", boost::bind(&AsyncTCPConnection::HandleReadRequest, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleReadRequest(const boost::system::error_code& iErr, std::size_t iBytesTransferred)
{
if(iErr)
{
return HandleError(__FUNCTION__, iErr);
}
LOG(LOGTRACE) << "(" << iBytesTransferred << ")" << ", in_avail = " << mRequestBuffer.in_avail() << ", size = " << mRequestBuffer.size() << ", max_size = " << mRequestBuffer.max_size();
std::istream wRequestStream(&mRequestBuffer);
std::string wFilePath;
wRequestStream >> wFilePath;
wRequestStream >> mFileSize;
wRequestStream.read(mBuffer.c_array(), 2);
mOutputFile.open(mFilePath, std::ios_base::binary);
if(!mOutputFile)
{
LOG(LOGERROR) << "Failed to open: " << wFilePath;
return;
}
do
{
wRequestStream.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
LOG(LOGTRACE) << "Write " << wRequestStream.gcount() << " bytes";
mOutputFile.write(mBuffer.c_array(), wRequestStream.gcount());
}
while(wRequestStream.gcount() > 0);
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()),boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleReadFileContent(const boost::system::error_code& iErr, std::size_t iBytesTransferred)
{
if(iBytesTransferred>0)
{
mOutputFile.write(mBuffer.c_array(), (std::streamsize)iBytesTransferred);
LOG(LOGTRACE) << "Received " << mOutputFile.tellp() << " bytes";
if (mOutputFile.tellp()>=(std::streamsize)mFileSize)
{
/// file is received, send a simple ack message
/// this code is never reach when I launch the last async_read_until on the client side
char *wAckMsg = "ack[;]";
boost::asio::async_write(mSocket, boost::asio::buffer(wAckMsg, strlen(wAckMsg)), boost::bind(&AsyncTCPConnection::HandleAcknowledge, this, boost::asio::placeholders::error));
}
}
if(iErr)
{
return HandleError(__FUNCTION__, iErr);
}
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()), boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleAcknowledge(const boost::system::error_code& iErr)
{
if(!iErr)
{
LOG(LOGDEBUG1) << "Message acknowledged";
return;
}
else
{
// in case of error free resources and bail
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
void AsyncTCPConnection::HandleError(const std::string& function_name, const boost::system::error_code& err)
{
LOG(LOGERROR) << " in " << function_name <<" due to " << err <<" " << err.message();
}
发送文件的代码:
boost::asio::io_service wIoService;
AsyncTCPClient client(wIoService, iServerIP, iFilePath);
wIoService.run();
我一直在寻找答案,我根本无法理解 what/why 正在发生。提前致谢。
在服务器(接收)端,你重复
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()),
boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
直到你完成。这似乎总是运作良好,因为最后一次读取将 return with eof:
This function is used to asynchronously read a certain number of bytes of data from a stream. The function call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:
- The supplied buffer is full (that is, it has reached maximum size).
- An error occurred.
但是现在,由于客户端 没有 关闭套接字(因为它正在等待 ACK),您将继续尝试读取直到填满整个 缓冲区。如果您的文件恰好大小合适,您 可能会 意外获得它。
其他问题:
char *wAckMsg = "ack[;]";
boost::asio::async_write(
mSocket, boost::asio::buffer(wAckMsg, strlen(wAckMsg)),
boost::bind(&AsyncTCPConnection::HandleAcknowledge, this, boost::asio::placeholders::error));
在这5行中有多少个错误:
- 字符串字面量 不能 被视为
char*
,而必须被视为 char const (&)[]
- 缓冲区指向那个局部变量。再一次,您可能很幸运,因为字符串文字通常驻留在静态数据段中,而本地只是指向它的指针,但这不是您应该依赖的东西。异步操作期间局部变量不存在,这可能导致 Undefined Behaviour 取决于使用的编译器
- 您将完成处理程序绑定到
this
而不是 shared_from_this
。这意味着将释放对 mAsyncTCPConnectionPtr
的最后引用,然后运行 ~AsyncTCPConnectionPtr
。这将破坏套接字,这可能会或可能不会在发送 Ack 完成之前发生。在多线程服务器中,由于数据竞争,这很容易导致 Undefined Behaviour。
- 发布 async_write 后,您不会退出该功能。这意味着后面的代码仍然会被执行,包括接下来的
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()), ...
.
我建议将以下内容作为最低限度的修复:
static char const *s_wAckMsg = "ack[;]";
boost::asio::async_write(
mSocket, boost::asio::buffer(s_wAckMsg, strlen(s_wAckMsg)),
boost::bind(&AsyncTCPConnection::HandleAcknowledge, shared_from_this(), boost::asio::placeholders::error));
return;
其他小问题
请求解析不稳健(不处理错误,也不检查格式。最后你只是盲目地使用 2 个字符,这只是 假设 '\n\n'
,但你永远不知道)。
如果打开输出文件失败,您报告文件名错误。
您可以使用 boost::asio::async_connect
而不是笨拙的处理程序链。
如果您只想打印它,则无需使用 istreambuf_iterator<char>
构建字符串。
让我们尝试修复它
我写了一大堆代码来编译代码,并试图通过减少读取大小来修复错误。
async_read
调用重复,所以让我们删除重复:
void AsyncTCPConnection::DoReceiveFileContent() {
size_t expect = (mFileSize <= mOutputFile.tellp())? 0 : mFileSize - mOutputFile.tellp();
LOG(LOGDEBUG1) << "expectedContent: " << expect;
expect = std::min(mBuffer.size(), expect);
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), expect),
boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
现在只要我们想安排更多读取操作,我们就调用 DoReceiveFileContent()
。
演示
//#include "AsyncTCPConnection.h"
//#include "Debug.h"
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread.hpp>
#include <fstream>
#include <iostream>
struct Buffer {
size_t size() const { return sizeof(m_array); }
char const *c_array() const { return m_array; }
char *c_array() { return m_array; }
private:
char m_array[1024]{ 0 };
};
struct LogTx {
LogTx(std::string const &name) { std::cout << name << "\t"; }
LogTx(LogTx &&other) : armed(other.armed) { other.armed = false; }
~LogTx() { if (armed) std::cout << std::endl; }
template <typename... T> friend LogTx operator<<(LogTx tx, T &&... args) {
std::cout << (args << ...);
return tx;
}
private:
bool armed = true;
};
#define LOG(x) LogTx("LOG:" #x)
#define BIOLOG(x) LogTx("BIOLOG:" #x)
using boost::asio::ip::tcp;
struct AsyncTCPConnection : boost::enable_shared_from_this<AsyncTCPConnection> {
AsyncTCPConnection(boost::asio::io_service &iIoService, const std::string iFilePath);
~AsyncTCPConnection();
void Start();
void HandleReadRequest(const boost::system::error_code &iErr, std::size_t iBytesTransferred);
void HandleReadFileContent(const boost::system::error_code &iErr, std::size_t iBytesTransferred);
void DoReceiveFileContent();
void HandleAcknowledge(const boost::system::error_code &iErr);
void HandleError(const std::string &function_name, const boost::system::error_code &err);
tcp::socket &Socket() { return mSocket; }
private:
boost::asio::streambuf mRequestBuffer;
Buffer mBuffer;
tcp::socket mSocket;
std::streamsize mFileSize;
std::string mOutputFilePath;
std::ofstream mOutputFile;
};
AsyncTCPConnection::AsyncTCPConnection(boost::asio::io_service &iIoService, const std::string iFilePath)
: mSocket(iIoService), mFileSize(0), mOutputFilePath(iFilePath) {}
AsyncTCPConnection::~AsyncTCPConnection() {}
void AsyncTCPConnection::Start() {
LOG(LOGINFO) << "Start";
async_read_until(mSocket, mRequestBuffer, "\n\n",
boost::bind(&AsyncTCPConnection::HandleReadRequest, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleReadRequest(const boost::system::error_code &iErr, std::size_t iBytesTransferred) {
if (iErr) {
return HandleError(__FUNCTION__, iErr);
}
LOG(LOGTRACE) << "(" << iBytesTransferred << ")"
<< ", in_avail = " << mRequestBuffer.in_avail() << ", size = " << mRequestBuffer.size();
std::istream wRequestStream(&mRequestBuffer);
std::string wFilePath;
wRequestStream >> wFilePath;
LOG(LOGTRACE) << "Original filename " << wFilePath;
wRequestStream >> mFileSize;
LOG(LOGTRACE) << "Original filesize " << mFileSize;
wRequestStream.read(mBuffer.c_array(), 2);
mOutputFile.open(mOutputFilePath, std::ios_base::binary);
if (!wRequestStream) {
LOG(LOGERROR) << "Request could not be parsed";
return;
}
if (!mOutputFile) {
LOG(LOGERROR) << "Failed to open: " << mOutputFilePath;
return;
}
do {
wRequestStream.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
LOG(LOGTRACE) << "Write " << wRequestStream.gcount() << " bytes";
mOutputFile.write(mBuffer.c_array(), wRequestStream.gcount());
} while (wRequestStream.gcount() > 0);
DoReceiveFileContent();
}
void AsyncTCPConnection::DoReceiveFileContent() {
size_t expect = (mFileSize <= mOutputFile.tellp())? 0 : mFileSize - mOutputFile.tellp();
LOG(LOGDEBUG1) << "expectedContent: " << expect;
expect = std::min(mBuffer.size(), expect);
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), expect),
boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleReadFileContent(const boost::system::error_code &iErr, std::size_t iBytesTransferred) {
if (iBytesTransferred > 0) {
mOutputFile.write(mBuffer.c_array(), (std::streamsize)iBytesTransferred);
LOG(LOGTRACE) << "Received " << mOutputFile.tellp() << " bytes (+" << iBytesTransferred << ")";
if (mOutputFile.tellp() >= (std::streamsize)mFileSize) {
LOG(LOGTRACE) << "Receive complete at " << mFileSize << " bytes";
/// file is received, send a simple ack message
/// this code is never reach when I launch the last async_read_until on the client side
static char const *s_wAckMsg = "ack[;]";
boost::asio::async_write(
mSocket, boost::asio::buffer(s_wAckMsg, strlen(s_wAckMsg)),
boost::bind(&AsyncTCPConnection::HandleAcknowledge, shared_from_this(), boost::asio::placeholders::error));
return;
}
}
if (iErr) {
return HandleError(__FUNCTION__, iErr);
}
DoReceiveFileContent();
}
void AsyncTCPConnection::HandleAcknowledge(const boost::system::error_code &iErr) {
if (!iErr) {
LOG(LOGDEBUG1) << "Message ACK sent";
return;
} else {
// in case of error free resources and bail
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
void AsyncTCPConnection::HandleError(const std::string &function_name, const boost::system::error_code &err) {
LOG(LOGERROR) << " in " << function_name << " due to " << err << " " << err.message();
}
#include <boost/thread.hpp>
//#include "AsyncTCPClient.h"
struct AsyncTCPClient {
AsyncTCPClient(boost::asio::io_service &iIoService, const std::string &iServerIP, const std::string &iPath);
~AsyncTCPClient();
std::ifstream mSourceFile;
boost::asio::streambuf mRequest, mAckBuf;
tcp::resolver mResolver;
tcp::socket mSocket;
void HandleResolve(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator);
void HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator);
void HandleWriteFile(const boost::system::error_code &iErr);
void HandleReceiveAcknowledge(const boost::system::error_code &iErr);
Buffer mBuffer;
};
AsyncTCPClient::AsyncTCPClient(boost::asio::io_service &iIoService, const std::string &iServerIP,
const std::string &iPath)
: mResolver(iIoService), mSocket(iIoService) {
size_t wPos = iServerIP.find(':');
if (wPos == std::string::npos) {
return;
}
std::string wPortStr = iServerIP.substr(wPos + 1);
std::string wServerIP = iServerIP.substr(0, wPos);
mSourceFile.open(iPath, std::ios_base::binary | std::ios_base::ate);
if (!mSourceFile) {
LOG(LOGERROR) << "Failed to open file: " << iPath;
return;
}
size_t wFileSize = mSourceFile.tellg();
mSourceFile.seekg(0);
std::ostream wRequestStream(&mRequest);
wRequestStream << iPath << "\n" << wFileSize << "\n\n";
LOG(LOGINFO) << "File to transfer: " << iPath;
LOG(LOGINFO) << "Filesize: " << wFileSize << " bytes";
tcp::resolver::query wQuery(wServerIP, wPortStr);
mResolver.async_resolve(wQuery, boost::bind(&AsyncTCPClient::HandleResolve, this, boost::asio::placeholders::error,
boost::asio::placeholders::iterator));
}
AsyncTCPClient::~AsyncTCPClient() {}
void AsyncTCPClient::HandleResolve(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator) {
if (!iErr) {
tcp::endpoint wEndpoint = *iEndpointIterator;
mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this,
boost::asio::placeholders::error, ++iEndpointIterator));
} else {
LOG(LOGERROR) << "Error: " << iErr.message();
}
}
void AsyncTCPClient::HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator) {
if (!iErr) {
boost::asio::async_write(mSocket, mRequest,
boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
} else if (iEndpointIterator != tcp::resolver::iterator()) {
mSocket.close();
tcp::endpoint wEndpoint = *iEndpointIterator;
mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this,
boost::asio::placeholders::error, ++iEndpointIterator));
} else {
LOG(LOGERROR) << "Error: " << iErr.message();
}
}
void AsyncTCPClient::HandleWriteFile(const boost::system::error_code &iErr) {
if (!iErr) {
if (mSourceFile) {
mSourceFile.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
// EOF reached
if (mSourceFile.gcount() <= 0) {
LOG(LOGINFO) << "EOF reached";
return;
}
// LOG(LOGTRACE) << "Send " << mSourceFile.gcount() << "bytes, total: " << mSourceFile.tellg() << "
// bytes.\n";
boost::asio::async_write(
mSocket, boost::asio::buffer(mBuffer.c_array(), mSourceFile.gcount()),
boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
} else {
LOG(LOGINFO) << "File transmission done";
/// async_read responsible for receiving a simple "ack[;]" once server is done receiving
/// when I don't do this and simply return the server receives the file properly and sends the ack
/// when I do this the server stops never receives the full file and simply waits for all the bytes to
/// arrive which doesn't happen
boost::asio::async_read_until(
mSocket, mAckBuf, "[;]",
boost::bind(&AsyncTCPClient::HandleReceiveAcknowledge, this, boost::asio::placeholders::error));
}
} else {
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
void AsyncTCPClient::HandleReceiveAcknowledge(const boost::system::error_code &iErr) {
if (!iErr) {
LOG(LOGDEBUG1) << "Acknowledged this data: " << &mAckBuf;
return;
} else {
// in case of error free resources and bail
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
//////////////////////////////////////////////////
//////////////////////////////////////////////////
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread.hpp>
#include <fstream>
#include <iostream>
//#include "AsyncTCPClient.h"
//#include "AsyncTCPServer.h"
//#include "Debug.h"
struct AsyncTCPServer {
using mAsyncTCPConnectionPtr = boost::shared_ptr<AsyncTCPConnection>;
AsyncTCPServer(unsigned short iPort, const std::string iFilePath);
~AsyncTCPServer();
void HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code &iErr);
boost::asio::io_service mIoService;
tcp::acceptor mAcceptor;
};
AsyncTCPServer::AsyncTCPServer(unsigned short iPort, const std::string iFilePath)
: mAcceptor(mIoService, tcp::endpoint(tcp::v4(), iPort), true) {
mAsyncTCPConnectionPtr wNewConnection(new AsyncTCPConnection(mIoService, iFilePath));
mAcceptor.set_option(tcp::acceptor::reuse_address(true));
mAcceptor.async_accept(wNewConnection->Socket(), boost::bind(&AsyncTCPServer::HandleAccept, this, wNewConnection,
boost::asio::placeholders::error));
mIoService.run();
}
AsyncTCPServer::~AsyncTCPServer() { mIoService.stop(); }
void AsyncTCPServer::HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code &iErr) {
if (!iErr) {
iCurConnection->Start();
} else {
BIOLOG(BioSans::LOGERROR) << " " << iErr << ", " << iErr.message();
}
}
int main() {
boost::thread th([] {
boost::asio::io_service wIoService;
AsyncTCPClient client(wIoService, "127.0.0.1:6767",
//"/etc/dictionaries-common/words"
"main.cpp"
);
boost::this_thread::sleep_for(boost::chrono::seconds(1));
wIoService.run();
});
AsyncTCPServer server(6767, "outputfile.txt");
}
版画
LOG:LOGINFO File to transfer: main.cpp
LOG:LOGINFO Filesize: 12793 bytes
LOG:LOGINFO Start
LOG:LOGTRACE (16), in_avail = 512, size = 512
LOG:LOGTRACE Original filename main.cpp
LOG:LOGTRACE Original filesize 12793
LOG:LOGTRACE Write 496 bytes
LOG:LOGTRACE Write 0 bytes
LOG:LOGDEBUG1 expectedContent: 12297
LOG:LOGINFO File transmission done
LOG:LOGTRACE Received 1520 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 11273
LOG:LOGTRACE Received 2544 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 10249
LOG:LOGTRACE Received 3568 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 9225
LOG:LOGTRACE Received 4592 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 8201
LOG:LOGTRACE Received 5616 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 7177
LOG:LOGTRACE Received 6640 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 6153
LOG:LOGTRACE Received 7664 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 5129
LOG:LOGTRACE Received 8688 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 4105
LOG:LOGTRACE Received 9712 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 3081
LOG:LOGTRACE Received 10736 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 2057
LOG:LOGTRACE Received 11760 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 1033
LOG:LOGTRACE Received 12784 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 9
LOG:LOGTRACE Received 12793 bytes (+9)
LOG:LOGTRACE Receive complete at 12793 bytes
LOG:LOGDEBUG1 Message ACK sent
LOG:LOGDEBUG1 Acknowledged this data: ack[;]
确实文件是相同的:
d61f0515bc4ba003497d67e265b5e0bc main.cpp
d61f0515bc4ba003497d67e265b5e0bc outputfile.txt
我正在使用 boost asio 通过 TCP 执行文件传输。文件传输有效,但是当我决定通过链接 async_write
(在服务器上)和 async_read_until
(在客户端上)来实现从服务器到客户端的简单确认消息时,我观察到一个奇怪的行为:文件是服务器端不再正确接收。在传输结束前几百个字节,服务器不再接收任何字节,因此永远不会调用 async_write
负责确认文件传输。
当我写完文件后在客户端调用 async_read_until
时似乎会发生这种情况。由于某种原因,它影响了当前的文件传输。
客户端实现:
#include "StdAfx.h"
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread.hpp>
#include "AsyncTCPClient.h"
AsyncTCPClient::AsyncTCPClient(boost::asio::io_service& iIoService, const std::string& iServerIP, const std::string& iPath)
: mResolver(iIoService), mSocket(iIoService)
{
size_t wPos = iServerIP.find(':');
if(wPos==std::string::npos)
{
return;
}
std::string wPortStr = iServerIP.substr(wPos + 1);
std::string wServerIP = iServerIP.substr(0, wPos);
mSourceFile.open(iPath, std::ios_base::binary | std::ios_base::ate);
if(!mSourceFile)
{
LOG(LOGERROR) << "Failed to open file: " << iPath;
return;
}
size_t wFileSize = mSourceFile.tellg();
mSourceFile.seekg(0);
std::ostream wRequestStream(&mRequest);
wRequestStream << iPath << "\n" << wFileSize << "\n\n";
LOG(LOGINFO) << "File to transfer: " << iPath;
LOG(LOGINFO) << "Filesize: " << wFileSize << " bytes";
tcp::resolver::query wQuery(wServerIP, wPortStr);
mResolver.async_resolve(wQuery, boost::bind(&AsyncTCPClient::HandleResolve, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator));
}
AsyncTCPClient::~AsyncTCPClient()
{
}
void AsyncTCPClient::HandleResolve(const boost::system::error_code & iErr, tcp::resolver::iterator iEndpointIterator)
{
if(!iErr)
{
tcp::endpoint wEndpoint = *iEndpointIterator;
mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator));
}
else
{
LOG(LOGERROR) << "Error: " << iErr.message();
}
}
void AsyncTCPClient::HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator)
{
if(!iErr)
{
boost::asio::async_write(mSocket, mRequest, boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
}
else if(iEndpointIterator != tcp::resolver::iterator())
{
mSocket.close();
tcp::endpoint wEndpoint = *iEndpointIterator;
mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator));
}
else
{
LOG(LOGERROR) << "Error: " << iErr.message();
}
}
void AsyncTCPClient::HandleWriteFile(const boost::system::error_code& iErr)
{
if(!iErr)
{
if(mSourceFile)
{
mSourceFile.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
// EOF reached
if(mSourceFile.gcount() <= 0)
{
return;
}
//LOG(LOGTRACE) << "Send " << mSourceFile.gcount() << "bytes, total: " << mSourceFile.tellg() << " bytes.\n";
boost::asio::async_write(mSocket, boost::asio::buffer(mBuffer.c_array(), mSourceFile.gcount()), boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
}
else
{
LOG(LOGINFO) << "File transfer done";
/// async_read responsible for receiving a simple "ack[;]" once server is done receiving
/// when I don't do this and simply return the server receives the file properly and sends the ack
/// when I do this the server stops never receives the full file and simply waits for all the bytes to arrive which doesn't happen
boost::asio::async_read_until(mSocket, mRecBuf, "[;]", boost::bind(&AsyncTCPClient::HandleReceiveAcknowledge, this, boost::asio::placeholders::error));
}
}
else
{
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
void AsyncTCPClient::HandleReceiveAcknowledge(const boost::system::error_code& iErr)
{
if(!iErr)
{
std::string wRecData((std::istreambuf_iterator<char>(&mRecBuf)), std::istreambuf_iterator<char>());
LOG(LOGDEBUG1) << "Acknowledged this data: " << wRecData;
return;
}
else
{
// in case of error free resources and bail
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
服务器实现:
#include "StdAfx.h"
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <fstream>
#include <boost/enable_shared_from_this.hpp>
#include "AsyncTCPClient.h"
#include "AsyncTCPServer.h"
#include "Debug.h"
AsyncTCPServer::AsyncTCPServer(unsigned short iPort, const std::string iFilePath)
:mAcceptor(mIoService, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort), true)
{
mAsyncTCPConnectionPtr wNewConnection(new AsyncTCPConnection(mIoService, iFilePath));
mAcceptor.async_accept(wNewConnection->Socket(), boost::bind(&AsyncTCPServer::HandleAccept, this, wNewConnection, boost::asio::placeholders::error));
mIoService.run();
}
AsyncTCPServer::~AsyncTCPServer()
{
mIoService.stop();
}
void AsyncTCPServer::HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code& iErr)
{
if (!iErr)
{
iCurConnection->Start();
}
else
{
BIOLOG(BioSans::LOGERROR) << " " << iErr << ", " << iErr.message();
}
}
连接实现:
#include "StdAfx.h"
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <fstream>
#include "Debug.h"
#include "AsyncTCPConnection.h"
AsyncTCPConnection::AsyncTCPConnection(boost::asio::io_service& iIoService, const std::string iFilePath)
: mSocket(iIoService), mFileSize(0), mFilePath(iFilePath)
{
}
AsyncTCPConnection::~AsyncTCPConnection()
{
}
void AsyncTCPConnection::Start()
{
LOG(LOGINFO) << "Start";
async_read_until(mSocket, mRequestBuffer, "\n\n", boost::bind(&AsyncTCPConnection::HandleReadRequest, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleReadRequest(const boost::system::error_code& iErr, std::size_t iBytesTransferred)
{
if(iErr)
{
return HandleError(__FUNCTION__, iErr);
}
LOG(LOGTRACE) << "(" << iBytesTransferred << ")" << ", in_avail = " << mRequestBuffer.in_avail() << ", size = " << mRequestBuffer.size() << ", max_size = " << mRequestBuffer.max_size();
std::istream wRequestStream(&mRequestBuffer);
std::string wFilePath;
wRequestStream >> wFilePath;
wRequestStream >> mFileSize;
wRequestStream.read(mBuffer.c_array(), 2);
mOutputFile.open(mFilePath, std::ios_base::binary);
if(!mOutputFile)
{
LOG(LOGERROR) << "Failed to open: " << wFilePath;
return;
}
do
{
wRequestStream.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
LOG(LOGTRACE) << "Write " << wRequestStream.gcount() << " bytes";
mOutputFile.write(mBuffer.c_array(), wRequestStream.gcount());
}
while(wRequestStream.gcount() > 0);
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()),boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleReadFileContent(const boost::system::error_code& iErr, std::size_t iBytesTransferred)
{
if(iBytesTransferred>0)
{
mOutputFile.write(mBuffer.c_array(), (std::streamsize)iBytesTransferred);
LOG(LOGTRACE) << "Received " << mOutputFile.tellp() << " bytes";
if (mOutputFile.tellp()>=(std::streamsize)mFileSize)
{
/// file is received, send a simple ack message
/// this code is never reach when I launch the last async_read_until on the client side
char *wAckMsg = "ack[;]";
boost::asio::async_write(mSocket, boost::asio::buffer(wAckMsg, strlen(wAckMsg)), boost::bind(&AsyncTCPConnection::HandleAcknowledge, this, boost::asio::placeholders::error));
}
}
if(iErr)
{
return HandleError(__FUNCTION__, iErr);
}
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()), boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleAcknowledge(const boost::system::error_code& iErr)
{
if(!iErr)
{
LOG(LOGDEBUG1) << "Message acknowledged";
return;
}
else
{
// in case of error free resources and bail
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
void AsyncTCPConnection::HandleError(const std::string& function_name, const boost::system::error_code& err)
{
LOG(LOGERROR) << " in " << function_name <<" due to " << err <<" " << err.message();
}
发送文件的代码:
boost::asio::io_service wIoService;
AsyncTCPClient client(wIoService, iServerIP, iFilePath);
wIoService.run();
我一直在寻找答案,我根本无法理解 what/why 正在发生。提前致谢。
在服务器(接收)端,你重复
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()),
boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
直到你完成。这似乎总是运作良好,因为最后一次读取将 return with eof:
This function is used to asynchronously read a certain number of bytes of data from a stream. The function call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:
- The supplied buffer is full (that is, it has reached maximum size).
- An error occurred.
但是现在,由于客户端 没有 关闭套接字(因为它正在等待 ACK),您将继续尝试读取直到填满整个 缓冲区。如果您的文件恰好大小合适,您 可能会 意外获得它。
其他问题:
char *wAckMsg = "ack[;]";
boost::asio::async_write(
mSocket, boost::asio::buffer(wAckMsg, strlen(wAckMsg)),
boost::bind(&AsyncTCPConnection::HandleAcknowledge, this, boost::asio::placeholders::error));
在这5行中有多少个错误:
- 字符串字面量 不能 被视为
char*
,而必须被视为char const (&)[]
- 缓冲区指向那个局部变量。再一次,您可能很幸运,因为字符串文字通常驻留在静态数据段中,而本地只是指向它的指针,但这不是您应该依赖的东西。异步操作期间局部变量不存在,这可能导致 Undefined Behaviour 取决于使用的编译器
- 您将完成处理程序绑定到
this
而不是shared_from_this
。这意味着将释放对mAsyncTCPConnectionPtr
的最后引用,然后运行 ~AsyncTCPConnectionPtr
。这将破坏套接字,这可能会或可能不会在发送 Ack 完成之前发生。在多线程服务器中,由于数据竞争,这很容易导致 Undefined Behaviour。 - 发布 async_write 后,您不会退出该功能。这意味着后面的代码仍然会被执行,包括接下来的
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()), ...
.
我建议将以下内容作为最低限度的修复:
static char const *s_wAckMsg = "ack[;]";
boost::asio::async_write(
mSocket, boost::asio::buffer(s_wAckMsg, strlen(s_wAckMsg)),
boost::bind(&AsyncTCPConnection::HandleAcknowledge, shared_from_this(), boost::asio::placeholders::error));
return;
其他小问题
请求解析不稳健(不处理错误,也不检查格式。最后你只是盲目地使用 2 个字符,这只是 假设 '\n\n'
,但你永远不知道)。
如果打开输出文件失败,您报告文件名错误。
您可以使用 boost::asio::async_connect
而不是笨拙的处理程序链。
如果您只想打印它,则无需使用 istreambuf_iterator<char>
构建字符串。
让我们尝试修复它
我写了一大堆代码来编译代码,并试图通过减少读取大小来修复错误。
async_read
调用重复,所以让我们删除重复:
void AsyncTCPConnection::DoReceiveFileContent() {
size_t expect = (mFileSize <= mOutputFile.tellp())? 0 : mFileSize - mOutputFile.tellp();
LOG(LOGDEBUG1) << "expectedContent: " << expect;
expect = std::min(mBuffer.size(), expect);
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), expect),
boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
现在只要我们想安排更多读取操作,我们就调用 DoReceiveFileContent()
。
演示
//#include "AsyncTCPConnection.h"
//#include "Debug.h"
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread.hpp>
#include <fstream>
#include <iostream>
struct Buffer {
size_t size() const { return sizeof(m_array); }
char const *c_array() const { return m_array; }
char *c_array() { return m_array; }
private:
char m_array[1024]{ 0 };
};
struct LogTx {
LogTx(std::string const &name) { std::cout << name << "\t"; }
LogTx(LogTx &&other) : armed(other.armed) { other.armed = false; }
~LogTx() { if (armed) std::cout << std::endl; }
template <typename... T> friend LogTx operator<<(LogTx tx, T &&... args) {
std::cout << (args << ...);
return tx;
}
private:
bool armed = true;
};
#define LOG(x) LogTx("LOG:" #x)
#define BIOLOG(x) LogTx("BIOLOG:" #x)
using boost::asio::ip::tcp;
struct AsyncTCPConnection : boost::enable_shared_from_this<AsyncTCPConnection> {
AsyncTCPConnection(boost::asio::io_service &iIoService, const std::string iFilePath);
~AsyncTCPConnection();
void Start();
void HandleReadRequest(const boost::system::error_code &iErr, std::size_t iBytesTransferred);
void HandleReadFileContent(const boost::system::error_code &iErr, std::size_t iBytesTransferred);
void DoReceiveFileContent();
void HandleAcknowledge(const boost::system::error_code &iErr);
void HandleError(const std::string &function_name, const boost::system::error_code &err);
tcp::socket &Socket() { return mSocket; }
private:
boost::asio::streambuf mRequestBuffer;
Buffer mBuffer;
tcp::socket mSocket;
std::streamsize mFileSize;
std::string mOutputFilePath;
std::ofstream mOutputFile;
};
AsyncTCPConnection::AsyncTCPConnection(boost::asio::io_service &iIoService, const std::string iFilePath)
: mSocket(iIoService), mFileSize(0), mOutputFilePath(iFilePath) {}
AsyncTCPConnection::~AsyncTCPConnection() {}
void AsyncTCPConnection::Start() {
LOG(LOGINFO) << "Start";
async_read_until(mSocket, mRequestBuffer, "\n\n",
boost::bind(&AsyncTCPConnection::HandleReadRequest, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleReadRequest(const boost::system::error_code &iErr, std::size_t iBytesTransferred) {
if (iErr) {
return HandleError(__FUNCTION__, iErr);
}
LOG(LOGTRACE) << "(" << iBytesTransferred << ")"
<< ", in_avail = " << mRequestBuffer.in_avail() << ", size = " << mRequestBuffer.size();
std::istream wRequestStream(&mRequestBuffer);
std::string wFilePath;
wRequestStream >> wFilePath;
LOG(LOGTRACE) << "Original filename " << wFilePath;
wRequestStream >> mFileSize;
LOG(LOGTRACE) << "Original filesize " << mFileSize;
wRequestStream.read(mBuffer.c_array(), 2);
mOutputFile.open(mOutputFilePath, std::ios_base::binary);
if (!wRequestStream) {
LOG(LOGERROR) << "Request could not be parsed";
return;
}
if (!mOutputFile) {
LOG(LOGERROR) << "Failed to open: " << mOutputFilePath;
return;
}
do {
wRequestStream.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
LOG(LOGTRACE) << "Write " << wRequestStream.gcount() << " bytes";
mOutputFile.write(mBuffer.c_array(), wRequestStream.gcount());
} while (wRequestStream.gcount() > 0);
DoReceiveFileContent();
}
void AsyncTCPConnection::DoReceiveFileContent() {
size_t expect = (mFileSize <= mOutputFile.tellp())? 0 : mFileSize - mOutputFile.tellp();
LOG(LOGDEBUG1) << "expectedContent: " << expect;
expect = std::min(mBuffer.size(), expect);
async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), expect),
boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void AsyncTCPConnection::HandleReadFileContent(const boost::system::error_code &iErr, std::size_t iBytesTransferred) {
if (iBytesTransferred > 0) {
mOutputFile.write(mBuffer.c_array(), (std::streamsize)iBytesTransferred);
LOG(LOGTRACE) << "Received " << mOutputFile.tellp() << " bytes (+" << iBytesTransferred << ")";
if (mOutputFile.tellp() >= (std::streamsize)mFileSize) {
LOG(LOGTRACE) << "Receive complete at " << mFileSize << " bytes";
/// file is received, send a simple ack message
/// this code is never reach when I launch the last async_read_until on the client side
static char const *s_wAckMsg = "ack[;]";
boost::asio::async_write(
mSocket, boost::asio::buffer(s_wAckMsg, strlen(s_wAckMsg)),
boost::bind(&AsyncTCPConnection::HandleAcknowledge, shared_from_this(), boost::asio::placeholders::error));
return;
}
}
if (iErr) {
return HandleError(__FUNCTION__, iErr);
}
DoReceiveFileContent();
}
void AsyncTCPConnection::HandleAcknowledge(const boost::system::error_code &iErr) {
if (!iErr) {
LOG(LOGDEBUG1) << "Message ACK sent";
return;
} else {
// in case of error free resources and bail
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
void AsyncTCPConnection::HandleError(const std::string &function_name, const boost::system::error_code &err) {
LOG(LOGERROR) << " in " << function_name << " due to " << err << " " << err.message();
}
#include <boost/thread.hpp>
//#include "AsyncTCPClient.h"
struct AsyncTCPClient {
AsyncTCPClient(boost::asio::io_service &iIoService, const std::string &iServerIP, const std::string &iPath);
~AsyncTCPClient();
std::ifstream mSourceFile;
boost::asio::streambuf mRequest, mAckBuf;
tcp::resolver mResolver;
tcp::socket mSocket;
void HandleResolve(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator);
void HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator);
void HandleWriteFile(const boost::system::error_code &iErr);
void HandleReceiveAcknowledge(const boost::system::error_code &iErr);
Buffer mBuffer;
};
AsyncTCPClient::AsyncTCPClient(boost::asio::io_service &iIoService, const std::string &iServerIP,
const std::string &iPath)
: mResolver(iIoService), mSocket(iIoService) {
size_t wPos = iServerIP.find(':');
if (wPos == std::string::npos) {
return;
}
std::string wPortStr = iServerIP.substr(wPos + 1);
std::string wServerIP = iServerIP.substr(0, wPos);
mSourceFile.open(iPath, std::ios_base::binary | std::ios_base::ate);
if (!mSourceFile) {
LOG(LOGERROR) << "Failed to open file: " << iPath;
return;
}
size_t wFileSize = mSourceFile.tellg();
mSourceFile.seekg(0);
std::ostream wRequestStream(&mRequest);
wRequestStream << iPath << "\n" << wFileSize << "\n\n";
LOG(LOGINFO) << "File to transfer: " << iPath;
LOG(LOGINFO) << "Filesize: " << wFileSize << " bytes";
tcp::resolver::query wQuery(wServerIP, wPortStr);
mResolver.async_resolve(wQuery, boost::bind(&AsyncTCPClient::HandleResolve, this, boost::asio::placeholders::error,
boost::asio::placeholders::iterator));
}
AsyncTCPClient::~AsyncTCPClient() {}
void AsyncTCPClient::HandleResolve(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator) {
if (!iErr) {
tcp::endpoint wEndpoint = *iEndpointIterator;
mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this,
boost::asio::placeholders::error, ++iEndpointIterator));
} else {
LOG(LOGERROR) << "Error: " << iErr.message();
}
}
void AsyncTCPClient::HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator) {
if (!iErr) {
boost::asio::async_write(mSocket, mRequest,
boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
} else if (iEndpointIterator != tcp::resolver::iterator()) {
mSocket.close();
tcp::endpoint wEndpoint = *iEndpointIterator;
mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this,
boost::asio::placeholders::error, ++iEndpointIterator));
} else {
LOG(LOGERROR) << "Error: " << iErr.message();
}
}
void AsyncTCPClient::HandleWriteFile(const boost::system::error_code &iErr) {
if (!iErr) {
if (mSourceFile) {
mSourceFile.read(mBuffer.c_array(), (std::streamsize)mBuffer.size());
// EOF reached
if (mSourceFile.gcount() <= 0) {
LOG(LOGINFO) << "EOF reached";
return;
}
// LOG(LOGTRACE) << "Send " << mSourceFile.gcount() << "bytes, total: " << mSourceFile.tellg() << "
// bytes.\n";
boost::asio::async_write(
mSocket, boost::asio::buffer(mBuffer.c_array(), mSourceFile.gcount()),
boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error));
} else {
LOG(LOGINFO) << "File transmission done";
/// async_read responsible for receiving a simple "ack[;]" once server is done receiving
/// when I don't do this and simply return the server receives the file properly and sends the ack
/// when I do this the server stops never receives the full file and simply waits for all the bytes to
/// arrive which doesn't happen
boost::asio::async_read_until(
mSocket, mAckBuf, "[;]",
boost::bind(&AsyncTCPClient::HandleReceiveAcknowledge, this, boost::asio::placeholders::error));
}
} else {
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
void AsyncTCPClient::HandleReceiveAcknowledge(const boost::system::error_code &iErr) {
if (!iErr) {
LOG(LOGDEBUG1) << "Acknowledged this data: " << &mAckBuf;
return;
} else {
// in case of error free resources and bail
LOG(LOGERROR) << "Error value: " << iErr.value();
LOG(LOGERROR) << "Error message: " << iErr.message();
throw std::exception();
}
}
//////////////////////////////////////////////////
//////////////////////////////////////////////////
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread.hpp>
#include <fstream>
#include <iostream>
//#include "AsyncTCPClient.h"
//#include "AsyncTCPServer.h"
//#include "Debug.h"
struct AsyncTCPServer {
using mAsyncTCPConnectionPtr = boost::shared_ptr<AsyncTCPConnection>;
AsyncTCPServer(unsigned short iPort, const std::string iFilePath);
~AsyncTCPServer();
void HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code &iErr);
boost::asio::io_service mIoService;
tcp::acceptor mAcceptor;
};
AsyncTCPServer::AsyncTCPServer(unsigned short iPort, const std::string iFilePath)
: mAcceptor(mIoService, tcp::endpoint(tcp::v4(), iPort), true) {
mAsyncTCPConnectionPtr wNewConnection(new AsyncTCPConnection(mIoService, iFilePath));
mAcceptor.set_option(tcp::acceptor::reuse_address(true));
mAcceptor.async_accept(wNewConnection->Socket(), boost::bind(&AsyncTCPServer::HandleAccept, this, wNewConnection,
boost::asio::placeholders::error));
mIoService.run();
}
AsyncTCPServer::~AsyncTCPServer() { mIoService.stop(); }
void AsyncTCPServer::HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code &iErr) {
if (!iErr) {
iCurConnection->Start();
} else {
BIOLOG(BioSans::LOGERROR) << " " << iErr << ", " << iErr.message();
}
}
int main() {
boost::thread th([] {
boost::asio::io_service wIoService;
AsyncTCPClient client(wIoService, "127.0.0.1:6767",
//"/etc/dictionaries-common/words"
"main.cpp"
);
boost::this_thread::sleep_for(boost::chrono::seconds(1));
wIoService.run();
});
AsyncTCPServer server(6767, "outputfile.txt");
}
版画
LOG:LOGINFO File to transfer: main.cpp
LOG:LOGINFO Filesize: 12793 bytes
LOG:LOGINFO Start
LOG:LOGTRACE (16), in_avail = 512, size = 512
LOG:LOGTRACE Original filename main.cpp
LOG:LOGTRACE Original filesize 12793
LOG:LOGTRACE Write 496 bytes
LOG:LOGTRACE Write 0 bytes
LOG:LOGDEBUG1 expectedContent: 12297
LOG:LOGINFO File transmission done
LOG:LOGTRACE Received 1520 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 11273
LOG:LOGTRACE Received 2544 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 10249
LOG:LOGTRACE Received 3568 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 9225
LOG:LOGTRACE Received 4592 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 8201
LOG:LOGTRACE Received 5616 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 7177
LOG:LOGTRACE Received 6640 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 6153
LOG:LOGTRACE Received 7664 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 5129
LOG:LOGTRACE Received 8688 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 4105
LOG:LOGTRACE Received 9712 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 3081
LOG:LOGTRACE Received 10736 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 2057
LOG:LOGTRACE Received 11760 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 1033
LOG:LOGTRACE Received 12784 bytes (+1024)
LOG:LOGDEBUG1 expectedContent: 9
LOG:LOGTRACE Received 12793 bytes (+9)
LOG:LOGTRACE Receive complete at 12793 bytes
LOG:LOGDEBUG1 Message ACK sent
LOG:LOGDEBUG1 Acknowledged this data: ack[;]
确实文件是相同的:
d61f0515bc4ba003497d67e265b5e0bc main.cpp
d61f0515bc4ba003497d67e265b5e0bc outputfile.txt