无法从通过管道 C++ (Windows) 重定向的程序中获得正确的输出

Cannot get correct output from program redirected via pipe C++ (Windows)

我想实现一个结果,让程序的输出拆分到双端队列结构。

问题描述:我正在处理重定向使用 CreateProcess 创建的程序的输出。我需要读取程序的输出并逐行处理它。该程序本身以方便的形式提供输出,但我通过未命名管道接收的输出传送速度较慢(频率较低),它们出现在部分中,并且最后一行在某处被减半。来自管道的下一部分流将匹配并完成该行,但它会给程序的构建带来一些问题。

程序的 cout 与通过管道重定向的输出之间存在这种差异的原因是什么?

编辑:根据提议 user4581301 我试过使用 stringstream 和 getline,但似乎这些行仍然被切成两半,即使从程序直接 cout 没有重定向到管道没有这个问题。这导致了问题,行在队列的不同元素中被拆分(请查看下面的代码)。

Sample from the console

方法 ReadFromPipe 运行 在循环中。

void ProcessManagement::ReadFromPipe(void)

// Read output from the child process's pipe for STDOUT

{
    DWORD dwRead, dwWritten;
    char buffer[BUFSIZE];
    BOOL bSuccess = FALSE;
    std::deque<std::deque<std::string>> elems;

while (ReadFile(g_hChildStd_OUT_Rd, buffer, sizeof(buffer)-1, &dwRead, NULL) != FALSE)
{
    /* add terminating zero */
    buffer[dwRead] = '[=11=]';
    std::stringstream streamBuffer;
    streamBuffer << buffer;
    BufferToQueue(streamBuffer, ' ', elems);

    // Print deque
    for (std::deque <std::string> a : elems)
    {
        for (std::string b : a)
            std::cout << b;
        std::cout << std::endl;
    }

    }
}

以及 BufferToQueue 方法。

void ProcessManagement::BufferToQueue(std::stringstream &streamBuffer, char delim, std::deque<std::deque<std::string>> &elems) {
    std::string line;
    std::string word;
    std::deque<string> row;

 // Splitting the stringstream into queue of queue (does not work properly)
    while (std::getline(streamBuffer, line))
    {
        std::istringstream iss(line);
        while (std::getline(iss, word, delim))
            row.push_back(word);
    elems.push_back(row);
    row.clear();
    }
}

扩展@Captain Obvlious 关于同花顺的评论:

您遇到的问题是因为 WriteToPipe 函数没有在行尾刷新。如果前一个 ReadFromPipe 调用没有 newline 作为最后一个字符,您可以通过确保追加到前一个字符串来在 reader 中解决此问题。

修改函数:

bool ProcessManagement::BufferToQueue(std::stringstream &streamBuffer, char     delim, std::deque<std::deque<std::string>> &elems)
{
    std::string line;
    std::string word;
    std::deque<string> row;

    bool is_unflushed_line = streamBuffer.str().back() != '\n';

    // Splitting the stringstream into queue of queue (does not work properly)
    while (std::getline(streamBuffer, line))
    {
        std::istringstream iss(line);
        while (std::getline(iss, word, delim)) {
            row.push_back(word);
        }
        elems.push_back(row);
        row.clear();
    }
    return is_unflushed_line;
}

void ProcessManagement::ReadFromPipe(void)
// Read output from the child process's pipe for STDOUT
{
    DWORD dwRead, dwWritten;
    char buffer[BUFSIZE];
    BOOL bSuccess = FALSE;
    std::deque<std::deque<std::string>> elems;

    while (ReadFile(g_hChildStd_OUT_Rd, buffer, sizeof(buffer)-1, &dwRead, NULL) != FALSE)
    {
        /* add terminating zero */
        buffer[dwRead] = '[=10=]';
        std::stringstream streamBuffer;
        streamBuffer << buffer;
        bool is_unflushed_line = BufferToQueue(streamBuffer, ' ', elems);

        for(auto idx = 0; idx != elems.size(); ++idx)
        {
            for (std::string const& b : elems[idx])
                std::cout << b;
            if(idx == elems.size() - 1 && is_unflushed_line)
                break;// don't print a newline if input did not end with a newline
            std::cout << std::endl;
        }
    }
}

@indeterminately sequenced 的回答是正确的,感谢您的帮助。问题是,管道复制到的缓冲区太小,它正在将其分配给单独的缓冲区。

基于@indeterminately sequenced 将输出推送到队列结构的帮助的完整解决方案,也许它会对某人有所帮助。唯一的问题是打开的程序永远不会关闭,必须在某处使用TerminateProcess函数。

void ProcessManagement::CreateChildProcess()
// Create a child process that uses the previously created pipes for STDIN and STDOUT.
{
    SECURITY_ATTRIBUTES saAttr = { sizeof(SECURITY_ATTRIBUTES) };

    saAttr.bInheritHandle = TRUE;   //Pipe handles are inherited by child process.
    saAttr.lpSecurityDescriptor = NULL;

    // Create a pipe to get results from child's stdout.
    if (!CreatePipe(&hPipeRead, &hPipeWrite, &saAttr, 0))
        return;

    si.dwFlags = STARTF_USESHOWWINDOW | STARTF_USESTDHANDLES;
    si.hStdOutput = hPipeWrite;
    si.hStdError = hPipeWrite;
    si.wShowWindow = SW_HIDE;       // Prevents cmd window from flashing. Requires STARTF_USESHOWWINDOW in dwFlags.

    std::string command_temp = (" -i \"LAN 3\"");
    LPSTR st = const_cast<char *>(command_temp.c_str());

    BOOL fSuccess = CreateProcessA(program_path.c_str(), st, NULL, NULL, TRUE, CREATE_NEW_CONSOLE, NULL, NULL, &si, &pi);
    if (!fSuccess)
    {
        CloseHandle(hPipeWrite);
        CloseHandle(hPipeRead);
        return ;
    }

    /* Needs to be used somewhere
    TerminateProcess(pi.hProcess,exitCode);
    CloseHandle(hPipeWrite);
    CloseHandle(hPipeRead);
    CloseHandle(pi.hProcess);
    CloseHandle(pi.hThread);*/
}

void ProcessManagement::ReadFromPipe(void)
// Read output from the child process's pipe for STDOUT
{
    std::deque<std::deque<std::string>> elems;

    // Give some timeslice (50ms), so we won't waste 100% cpu.
    bProcessEnded = WaitForSingleObject(pi.hProcess, 50) == WAIT_OBJECT_0;

    // Even if process exited - we continue reading, if there is some data available over pipe.
    for (;;)
    {
        char buf[8192];
        DWORD dwRead = 0;
        DWORD dwAvail = 0;

        if (!::PeekNamedPipe(hPipeRead, NULL, 0, NULL, &dwAvail, NULL))
            break;

        if (!dwAvail) // no data available, return
            break;
        if (!::ReadFile(hPipeRead, buf, min(sizeof(buf)-1, dwAvail), &dwRead, NULL) || !dwRead)
            // error, the child process might ended
            break;
        buf[dwRead] = '[=10=]';
        std::stringstream streamBuffer;
        streamBuffer << unflushed_line << buf; // add to buffer also last unflashed line
        unflushed_line = BufferToQueue(streamBuffer, ' ', elems);

        for (auto idx = 0; idx != elems.size(); ++idx)
        {
            for (std::string const& b : elems[idx])
                std::cout << b;

            std::cout << std::endl;
        }

    }
}

std::string ProcessManagement::BufferToQueue(std::stringstream &streamBuffer, char delim, std::deque<std::deque<std::string>> &elems) {
    std::string line;
    std::string word;
    std::deque<string> row;

    bool is_unflushed_line = streamBuffer.str().back() != '\n';

    // Splitting the stringstream into queue of queue (does not work properly)
    while (std::getline(streamBuffer, line, '\n'))
    {
        std::istringstream iss(line);
        while (std::getline(iss, word, delim)) {
            row.push_back(word);
        }
        elems.push_back(row);
        row.clear();
    }

    if (is_unflushed_line)
    {
        elems.pop_back(); // pop not fully flushed line
    }
    else line.clear(); // if the line was fully flushed return empty string

    return line; // to add to buffer for next push to queue if the last was not flushed at the end

}