为什么 Windows 任务管理器在写入非常大的文件时显示内存增加?我应该担心吗?

Why does Windows task manager show memory increases while writing very large files? Should I be worried?

我正在开发一个 c++ 应用程序(在 VS2012 中,Windows Server 2012 R2),它将大量二进制数据从已分配的循环缓冲区数组写入原始文件。问题是 Windows 任务管理器报告的系统 RAM 使用率随着 fwrite 将数据写入文件而线性增加,直到达到几乎保持不变的某个点(另请参见下图)。此外,我的应用程序使用的内存始终保持不变。

我定期调用fflush,但没有效果。虽然这似乎是一个无害的案例,但我在性能方面担心这个问题,因为另一个 Java 应用程序也将 运行 在名义上运行。

因此,我想问一下我是否应该担心这个问题,以及是否有办法避免这个问题,以实现实时数据记录系统的最佳性能。

针对 linux 操作系统 here and here 提出了类似的问题,据说只要有足够的可用内存,系统就可以投入一定数量的内存来缓存数据.

接下来介绍申请的一部分。简而言之,应用程序控制一对相机,每个相机都获取帧并将它们存储在正确对齐的分配缓冲区中。有 i) 一个 CameraInterface class,它创建两个 "producer" 线程,ii) 一个 Recorder class,它创建两个 "consumer" 线程和 iii) 一个 SharedMemoryManager class 它为生产者提供了一个可用的缓冲区来存储数据,并为消费者提供了下一个要写入文件的缓冲区。 SharedMemoryManager 包含两个缓冲区数组(每对生产者-消费者一个)和两个相应的标志数组,用于指示缓冲区的状态。它还包含两个 std::queue 对象,用于快速访问下一个要记录的缓冲区。接下来显示 Recorder 和 SharedMemoryManager 的部分。

// somewhere in file "atcore.h"...
typedef unsigned char AT_U8;

// File: SharedMemoryManager.h
#ifndef __MEM_MANAGER__
#define __MEM_MANAGER__

#pragma once

#include "atcore.h"
#include <queue>
#include <mutex>

#define NBUFFERS 128

#define BUFFER_AVAILABLE 0
#define BUFFER_QUEUED 1
#define BUFFER_FULL 2
#define BUFFER_RECORDING_PENDING 3

// the status flag cycle is
// EMPTY -> QUEUED -> FULL -> RECORDING_PENDING -> EMPTY

using namespace std;

typedef struct{
    AT_U8** buffers;
    int* flags;
    int acquiredCounter;
    int consumedCounter;
    int queuedCounter;
    mutex flagMtx;
} sharedMemory;

typedef struct{
    AT_U8* buffer;
    int bufSize;
    int index;
} record;

class SharedMemoryManager
{
public:
    SharedMemoryManager();
    ~SharedMemoryManager(void);

    void enableRecording();
    void disableRecording();
    int setupMemory(int cameraIdentifier, int bufferSize);
    void freeMemory();
    void freeCameraMemory(int cameraIdentifier); 
    int getBufferSize(int cameraIdentifier);
    AT_U8* getBufferForCameraQueue(int cameraIdentifier);  // get pointer to the     next available buffer for queueing in the camera
    int hasFramesForRecording(int cameraIdentifier);  // ask how many frames for     recording are there in the respective queue
    AT_U8* getNextFrameForRecording(int cameraIdentifier);  // get pointer to the     next buffer to be recorded to a file
    void copyMostRecentFrame(unsigned char* buffer, int cameraIdentifier); //     TODO  // get a copy of the most recent frame on the buffer
    void notifyAcquiredFrame(AT_U8* buffer, int bufSize, int cameraIdentifier);      // use this function to notify the manager that the buffer has just been filled with     data
    void notifyRecordedFrame(AT_U8* buffer, int cameraIdentifier);  // use this function to notify the manager that the buffer has just been written to file and can be used again

private:
    bool useMem0, useMem1;
    int bufSize0, bufSize1;
    sharedMemory* memory0;
    sharedMemory* memory1;
    queue<record*> framesForRecording0;
    queue<record*> framesForRecording1;
    bool isRecording;

    int allocateBuffers(sharedMemory* mem, int bufSize);
    void freeBufferArray(sharedMemory* mem);
};

#endif  // !__MEM_MANAGER


// File: SharedMemoryManager.cpp
...
int SharedMemoryManager::hasFramesForRecording(int cameraIdentifier){
    if (cameraIdentifier!=0 && cameraIdentifier!=1){
        cout << "Could not get the number of frames in the shared memory. Invalid camera id " << cameraIdentifier << endl;
        return -1;
    }

    if (cameraIdentifier==0){
        return (int)framesForRecording0.size();
    }
    else{
        return (int)framesForRecording1.size();
    }
}

AT_U8* SharedMemoryManager::getNextFrameForRecording(int cameraIdentifier){
    if (cameraIdentifier!=0 && cameraIdentifier!=1){
        cout << "Error in getNextFrameForRecording. Invalid camera id " <<     cameraIdentifier << endl;
        return NULL;
    }

    sharedMemory* mem;
    if (cameraIdentifier==0) mem=memory0;
    else mem=memory1;

    queue<record*>* framesQueuePtr;
    if (cameraIdentifier==0) framesQueuePtr = &framesForRecording0;
    else framesQueuePtr = &framesForRecording1;

    if (framesQueuePtr->empty()){  // no frames to be recorded at the moment
        return NULL;
    }

    record* item;
    int idx;
    AT_U8* buffer = NULL;

    item = framesQueuePtr->front();
    framesQueuePtr->pop();
    idx = item->index;
    delete item;
    mem->flagMtx.lock();
    if (mem->flags[idx] == BUFFER_FULL){
        mem->flags[idx] = BUFFER_RECORDING_PENDING;
        buffer = mem->buffers[idx];
    }
    else{
        cout << "PROBLEM. Buffer in getBufferForRecording. Buffer flag is " <<     mem->flags[idx] << endl;
        cout << "----- BUFFER FLAGS -----" << endl;
        for (int i=0; i<NBUFFERS; i++){
            cout << "[" << i << "] " << mem->flags[i] << endl;
        }
        cout << "----- -----" << endl;
    }
    mem->flagMtx.unlock();
    return buffer;
}

int SharedMemoryManager::allocateBuffers(sharedMemory* mem, int bufSize){
    // allocate the array for the buffers
    mem->buffers = (AT_U8**)calloc(NBUFFERS,sizeof(AT_U8*));
    if (mem->buffers==NULL){
        cout << "Could not allocate array of buffers." << endl;
        return -1;
    }
    // allocate the array for the respective flags
    mem->flags = (int*)malloc(NBUFFERS*sizeof(int));
    if (mem->flags==NULL){
        cout << "Could not allocate array of flags for the buffers." << endl;
        free(mem->buffers);
        return -1;
    }

    int i;
    for (i=0; i<NBUFFERS; i++){ // allocate the buffers
        mem->buffers[i] = (AT_U8*)_aligned_malloc((size_t)bufSize,8);
        if (mem->buffers[i] == NULL){
            cout << "Could not allocate memory for buffer no. " << i << endl;
            for (int j=0; j<i; j++){  // free the previously allocated buffers
                _aligned_free(mem->buffers[j]);
            }
            free(mem->buffers);
            free(mem->flags);
            return -1;
        }
        else{
            mem->flags[i]=BUFFER_AVAILABLE;
        }
    }

    return 0;
}

void SharedMemoryManager::freeBufferArray(sharedMemory* mem){
    if (mem!=NULL){
        for(int i=0; i<NBUFFERS; i++){
            _aligned_free(mem->buffers[i]);
            mem->buffers[i]=NULL;
        }
        free(mem->buffers);
        mem->buffers = NULL;
        free(mem->flags);
        mem->flags = NULL;
        free(mem);
        mem = NULL;
    }
}

// File: Recorder.h
#ifndef __RECORDER__
#define __RECORDER__

#pragma once

#include <string>
#include <queue>
#include <future>
#include <thread>
#include "atcore.h"
#include "SharedMemoryManager.h"

using namespace std;

class Recorder
{
public:
    Recorder(SharedMemoryManager* memoryManager);
    ~Recorder();

    void recordBuffer(AT_U8 *buffer, int bufsize);
    int setupRecording(string filename0, string filename1, bool open0, bool open1);
    void startRecording();
    void stopRecording();
    int testWriteSpeed(string directoryPath, string filename);
    void insertFrameItem(AT_U8* buffer, int bufSize, int chunkID);

private:
    FILE *chunk0, *chunk1;
    string chunkFilename0, chunkFilename1;
    int frameCounter0, frameCounter1;
    bool writes0, writes1;
    int bufSize0, bufSize1;

    static SharedMemoryManager* manager;

    bool isRecording;

    promise<int> prom0;
    promise<int> prom1;
    thread* recordingThread0;
    thread* recordingThread1;

    static void performRecording(promise<int>* exitCode, int chunkIdentifier);
    void writeNextItem(int chunkIdentifier);
    void closeFiles();

};

#endif //!__RECORDER__

// File: Recorder.cpp
#include "Recorder.h"
#include <ctime>
#include <iostream>
using namespace std;

Recorder* recorderInstance; // keep a pointer to the current instance, for accessing static functions from (non-static) objects in the threads
SharedMemoryManager* Recorder::manager;  // the same reason
...    
void Recorder::startRecording(){
    if (isRecording == false){  // do not start new threads if some are still running
        isRecording = true;
        if (writes0==true)  recordingThread0 = new thread(&Recorder::performRecording, &prom0, 0);
        if (writes1==true)  recordingThread1 = new thread(&Recorder::performRecording, &prom1, 1);
    }
}

void Recorder::writeNextItem(int chunkIdentifier){
    FILE* chunk;
    AT_U8* buffer;
    int* bufSize;
    if (chunkIdentifier==0){
        chunk = chunk0;
        bufSize = &bufSize0;
        buffer = manager->getNextFrameForRecording(0);
    }
    else {
        chunk = chunk1;
        bufSize = &bufSize1;
        buffer = manager->getNextFrameForRecording(1);
    }

    size_t nbytes = fwrite(buffer, 1, (*bufSize)*sizeof(unsigned char), chunk);
    if (nbytes<=0){
        cout << "No data were written to file." << endl;
    }

    manager->notifyRecordedFrame(buffer,chunkIdentifier);

    if (chunkIdentifier==0) frameCounter0++;
    else frameCounter1++;
}

void Recorder::performRecording(promise<int>* exitCode, int chunkIdentifier){
    bool flag = true;
    int remaining = manager->hasFramesForRecording(chunkIdentifier);
    while( recorderInstance->isRecording==true || remaining>0 ){
        if (remaining>0){
            if (recorderInstance->isRecording==false){
                cout << "Acquisition stopped, still " << remaining << " frames are to be recorded in chunk " << chunkIdentifier << endl;
            }
            recorderInstance->writeNextItem(chunkIdentifier);
        }
        else{
            this_thread::sleep_for(chrono::milliseconds(10));
        }
        remaining = manager->hasFramesForRecording(chunkIdentifier);
    }    
    cout << "Done recording." << endl;
}   

在您显示的 Windows 内存使用屏幕截图中,最大块 (45GB) 是 "cached",其中 27GB 是 "modified",即 "dirty pages waiting to be written to disk"。这是正常行为,因为您的写入速度快于磁盘 I/O 可以跟上的速度。 flush/fflush 对此没有影响,因为它不在您的进程中。如您所见:"the memory used by my application remains constant the whole time"。别担心。但是,如果您真的不希望OS缓冲脏输出页面,请考虑使用Windows上可用的"unbuffered I/O",因为它会写通过立即到磁盘。

编辑:一些指向 Windows 上无缓冲 I/O 的链接。请注意,无缓冲 I/O 对您的读取和写入设置了内存对齐限制。

File Buffering

CreateFile function