通过 Beast websocket 发送连续数据块时无法找到 "Broken Pipe" 错误的原因

Unable to find the reason for "Broken Pipe" error while sending continuous data chunks through Beast websocket

我正在使用 IBM Watson 语音到文本 Web 服务 流式音频识别 API。我在 C++(std 11)中创建了一个带有 boost (beast 1.68.0) 库的网络套接字。

我已成功连接到 IBM 服务器,并希望通过以下方式向服务器发送 231,296 字节 的原始音频数据。

{
  "action": "start",
  "content-type": "audio/l16;rate=44100"
}

websocket.binary(true);
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 31,296 bytes>

websocket.binary(false);
{
  "action": "stop"
}

来自 IBMServer 的预期结果是:

 {"results": [
      {"alternatives": [
            {  "confidence": xxxx, 
               "transcript": "call Rohan Chauhan "
            }],"final": true
      }], "result_index": 0
}

But I am not getting the desired result: rather the error says "Broken pipe"

DataSize is: 50000 | mIsLast is : 0
DataSize is: 50000 | mIsLast is : 0
what : Broken pipe
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 31296 | mIsLast is : 0
what : Operation canceled

Here is my code which is an adaptation of the sample example given in beast library.

Foo.hpp

class IbmWebsocketSession: public std::enable_shared_from_this<IbmWebsocketSession> {
protected:
    char binarydata[50000];
    std::string TextStart;
    std::string TextStop;

public:
    explicit IbmWebsocketSession(net::io_context& ioc, ssl::context& ctx, SttService* ibmWatsonobj) :
        mResolver(ioc), mWebSocket(ioc, ctx) {
    TextStart ="{\"action\":\"start\",\"content-type\": \"audio/l16;rate=44100\"}";
    TextStop = "{\"action\":\"stop\"}";


   /**********************************************************************
    * Desc  : Send start frame
   **********************************************************************/
    void send_start(beast::error_code ec);
   /**********************************************************************
    * Desc  : Send Binary data
   **********************************************************************/
    void send_binary(beast::error_code ec);
   /**********************************************************************
    * Desc  : Send Stop frame
   **********************************************************************/
    void send_stop(beast::error_code ec);
   /**********************************************************************
    * Desc  : Read the file for binary data to be sent
   **********************************************************************/
    void readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF);
}

Foo.cpp

void IbmWebsocketSession::on_ssl_handshake(beast::error_code ec) {
    if(ec)
        return fail(ec, "connect");
// Perform the websocket handshake
    ws_.async_handshake_ex(host, "/speech-to-text/api/v1/recognize", [Token](request_type& reqHead) {reqHead.insert(http::field::authorization,Token);},bind(&IbmWebsocketSession::send_start, shared_from_this(),placeholders::_1));
}

void IbmWebsocketSession::send_start(beast::error_code ec){
    if(ec)
        return fail(ec, "ssl_handshake");

    ws_.async_write(net::buffer(TextStart),
        bind(&IbmWebsocketSession::send_binary, shared_from_this(),placeholders::_1));
}

void IbmWebsocketSession::send_binary(beast::error_code ec) {
    if(ec)
        return fail(ec, "send_start");
    readFile(binarydata, &Datasize, &StartPos, &IsLast);

    ws_.binary(true);
    if (!IsLast) {
        ws_.async_write(net::buffer(binarydata, Datasize),
            bind(&IbmWebsocketSession::send_binary, shared_from_this(),
                    placeholders::_1));

    } else {
        IbmWebsocketSession::on_binarysent(ec);
    }
}

void IbmWebsocketSession::on_binarysent(beast::error_code ec) {
    if(ec)
        return fail(ec, "send_binary");

    ws_.binary(false);
    ws_.async_write(net::buffer(TextStop),
           bind(&IbmWebsocketSession::read_response, shared_from_this(), placeholders::_1));
}

void IbmWebsocketSession::readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF) {

    unsigned int end = 0;
    unsigned int start = 0;
    unsigned int length = 0;

    // Creation of ifstream class object to read the file
    ifstream infile(filepath, ifstream::binary);

    if (infile) {
        // Get the size of the file
        infile.seekg(0, ios::end);
        end = infile.tellg();

        infile.seekg(*start_pos, ios::beg);
        start = infile.tellg();

        length = end - start;
    }

    if ((size_t) length < 150) {
        *Len = (size_t) length;
        *ReachedEOF = true;
    // cout << "Reached end of File (last 150 bytes)" << endl;

    } else if ((size_t) length <= 50000) {  //Maximumbytes to send are 50000
        *Len = (size_t) length;
        *start_pos += (size_t) length;
        *ReachedEOF = false;
        infile.read(bdata, length);

    } else {
        *Len = 50000;
        *start_pos += 50000;
        *ReachedEOF = false;
        infile.read(bdata, 50000);
    }

    infile.close();
}

这里有什么建议吗?

来自 boost 的 documentation 我们在 websocket::async_write

上有以下摘录

This function is used to asynchronously write a complete message. This call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:

  1. The complete message is written.

  2. An error occurs.

因此,当您创建要传递给它的缓冲区对象时 net::buffer(TextStart) 例如,传递给它的 buffer 的生命周期仅到函数 returns 为止。可能是即使在 returns 函数之后,异步写入仍在根据文档在缓冲区上运行,但内容不再有效,因为 buffer 是一个局部变量。

要解决这个问题,您可以将 TextStart 设置为静态或将其声明为 class 的成员并将其复制到 boost::asio::buffer 有很多关于如何操作的示例那。请注意,我只在 IbmWebsocketSession::send_start 函数中提到了 TextStart。整个代码中的问题几乎相同。

来自 IBM Watson 的 API definition,Initiate a connection 需要某种格式,然后可以将其表示为字符串。您有字符串但缺少正确的格式,因此连接被对等端关闭并且您正在写入一个关闭的套接字,因此管道损坏。

发起连接需要:

  var message = {
    action: 'start',
    content-type: 'audio/l16;rate=22050'
  };

根据您的要求可以表示为string TextStart = "action: 'start',\r\ncontent-type: 'audio\/l16;rate=44100'"

根据聊天中的讨论,OP 通过添加代码解决了问题:

if (!IsLast ) {
    ws_.async_write(net::buffer(binarydata, Datasize),
    bind(&IbmWebsocketSession::send_binary, shared_from_this(),
    placeholders::_1));
} 
else {
     if (mIbmWatsonobj->IsGstFileWriteDone()) { //checks for the file write completion
         IbmWebsocketSession::on_binarysent(ec);
     } else {
         std::this_thread::sleep_for(std::chrono::seconds(1));
         IbmWebsocketSession::send_binary(ec);
     }
}

讨论源于这样一个事实,即在同一组字节上完成文件写入之前,更多的字节被发送到客户端。 OP 现在会在尝试发送更多字节之前验证这一点。