为什么 Windows 任务管理器在写入非常大的文件时显示内存增加?我应该担心吗?
Why does Windows task manager show memory increases while writing very large files? Should I be worried?
我正在开发一个 c++ 应用程序(在 VS2012 中,Windows Server 2012 R2),它将大量二进制数据从已分配的循环缓冲区数组写入原始文件。问题是 Windows 任务管理器报告的系统 RAM 使用率随着 fwrite 将数据写入文件而线性增加,直到达到几乎保持不变的某个点(另请参见下图)。此外,我的应用程序使用的内存始终保持不变。
我定期调用fflush,但没有效果。虽然这似乎是一个无害的案例,但我在性能方面担心这个问题,因为另一个 Java 应用程序也将 运行 在名义上运行。
因此,我想问一下我是否应该担心这个问题,以及是否有办法避免这个问题,以实现实时数据记录系统的最佳性能。
针对 linux 操作系统 here and here 提出了类似的问题,据说只要有足够的可用内存,系统就可以投入一定数量的内存来缓存数据.
接下来介绍申请的一部分。简而言之,应用程序控制一对相机,每个相机都获取帧并将它们存储在正确对齐的分配缓冲区中。有 i) 一个 CameraInterface class,它创建两个 "producer" 线程,ii) 一个 Recorder class,它创建两个 "consumer" 线程和 iii) 一个 SharedMemoryManager class 它为生产者提供了一个可用的缓冲区来存储数据,并为消费者提供了下一个要写入文件的缓冲区。 SharedMemoryManager 包含两个缓冲区数组(每对生产者-消费者一个)和两个相应的标志数组,用于指示缓冲区的状态。它还包含两个 std::queue 对象,用于快速访问下一个要记录的缓冲区。接下来显示 Recorder 和 SharedMemoryManager 的部分。
// somewhere in file "atcore.h"...
typedef unsigned char AT_U8;
// File: SharedMemoryManager.h
#ifndef __MEM_MANAGER__
#define __MEM_MANAGER__
#pragma once
#include "atcore.h"
#include <queue>
#include <mutex>
#define NBUFFERS 128
#define BUFFER_AVAILABLE 0
#define BUFFER_QUEUED 1
#define BUFFER_FULL 2
#define BUFFER_RECORDING_PENDING 3
// the status flag cycle is
// EMPTY -> QUEUED -> FULL -> RECORDING_PENDING -> EMPTY
using namespace std;
typedef struct{
AT_U8** buffers;
int* flags;
int acquiredCounter;
int consumedCounter;
int queuedCounter;
mutex flagMtx;
} sharedMemory;
typedef struct{
AT_U8* buffer;
int bufSize;
int index;
} record;
class SharedMemoryManager
{
public:
SharedMemoryManager();
~SharedMemoryManager(void);
void enableRecording();
void disableRecording();
int setupMemory(int cameraIdentifier, int bufferSize);
void freeMemory();
void freeCameraMemory(int cameraIdentifier);
int getBufferSize(int cameraIdentifier);
AT_U8* getBufferForCameraQueue(int cameraIdentifier); // get pointer to the next available buffer for queueing in the camera
int hasFramesForRecording(int cameraIdentifier); // ask how many frames for recording are there in the respective queue
AT_U8* getNextFrameForRecording(int cameraIdentifier); // get pointer to the next buffer to be recorded to a file
void copyMostRecentFrame(unsigned char* buffer, int cameraIdentifier); // TODO // get a copy of the most recent frame on the buffer
void notifyAcquiredFrame(AT_U8* buffer, int bufSize, int cameraIdentifier); // use this function to notify the manager that the buffer has just been filled with data
void notifyRecordedFrame(AT_U8* buffer, int cameraIdentifier); // use this function to notify the manager that the buffer has just been written to file and can be used again
private:
bool useMem0, useMem1;
int bufSize0, bufSize1;
sharedMemory* memory0;
sharedMemory* memory1;
queue<record*> framesForRecording0;
queue<record*> framesForRecording1;
bool isRecording;
int allocateBuffers(sharedMemory* mem, int bufSize);
void freeBufferArray(sharedMemory* mem);
};
#endif // !__MEM_MANAGER
// File: SharedMemoryManager.cpp
...
int SharedMemoryManager::hasFramesForRecording(int cameraIdentifier){
if (cameraIdentifier!=0 && cameraIdentifier!=1){
cout << "Could not get the number of frames in the shared memory. Invalid camera id " << cameraIdentifier << endl;
return -1;
}
if (cameraIdentifier==0){
return (int)framesForRecording0.size();
}
else{
return (int)framesForRecording1.size();
}
}
AT_U8* SharedMemoryManager::getNextFrameForRecording(int cameraIdentifier){
if (cameraIdentifier!=0 && cameraIdentifier!=1){
cout << "Error in getNextFrameForRecording. Invalid camera id " << cameraIdentifier << endl;
return NULL;
}
sharedMemory* mem;
if (cameraIdentifier==0) mem=memory0;
else mem=memory1;
queue<record*>* framesQueuePtr;
if (cameraIdentifier==0) framesQueuePtr = &framesForRecording0;
else framesQueuePtr = &framesForRecording1;
if (framesQueuePtr->empty()){ // no frames to be recorded at the moment
return NULL;
}
record* item;
int idx;
AT_U8* buffer = NULL;
item = framesQueuePtr->front();
framesQueuePtr->pop();
idx = item->index;
delete item;
mem->flagMtx.lock();
if (mem->flags[idx] == BUFFER_FULL){
mem->flags[idx] = BUFFER_RECORDING_PENDING;
buffer = mem->buffers[idx];
}
else{
cout << "PROBLEM. Buffer in getBufferForRecording. Buffer flag is " << mem->flags[idx] << endl;
cout << "----- BUFFER FLAGS -----" << endl;
for (int i=0; i<NBUFFERS; i++){
cout << "[" << i << "] " << mem->flags[i] << endl;
}
cout << "----- -----" << endl;
}
mem->flagMtx.unlock();
return buffer;
}
int SharedMemoryManager::allocateBuffers(sharedMemory* mem, int bufSize){
// allocate the array for the buffers
mem->buffers = (AT_U8**)calloc(NBUFFERS,sizeof(AT_U8*));
if (mem->buffers==NULL){
cout << "Could not allocate array of buffers." << endl;
return -1;
}
// allocate the array for the respective flags
mem->flags = (int*)malloc(NBUFFERS*sizeof(int));
if (mem->flags==NULL){
cout << "Could not allocate array of flags for the buffers." << endl;
free(mem->buffers);
return -1;
}
int i;
for (i=0; i<NBUFFERS; i++){ // allocate the buffers
mem->buffers[i] = (AT_U8*)_aligned_malloc((size_t)bufSize,8);
if (mem->buffers[i] == NULL){
cout << "Could not allocate memory for buffer no. " << i << endl;
for (int j=0; j<i; j++){ // free the previously allocated buffers
_aligned_free(mem->buffers[j]);
}
free(mem->buffers);
free(mem->flags);
return -1;
}
else{
mem->flags[i]=BUFFER_AVAILABLE;
}
}
return 0;
}
void SharedMemoryManager::freeBufferArray(sharedMemory* mem){
if (mem!=NULL){
for(int i=0; i<NBUFFERS; i++){
_aligned_free(mem->buffers[i]);
mem->buffers[i]=NULL;
}
free(mem->buffers);
mem->buffers = NULL;
free(mem->flags);
mem->flags = NULL;
free(mem);
mem = NULL;
}
}
// File: Recorder.h
#ifndef __RECORDER__
#define __RECORDER__
#pragma once
#include <string>
#include <queue>
#include <future>
#include <thread>
#include "atcore.h"
#include "SharedMemoryManager.h"
using namespace std;
class Recorder
{
public:
Recorder(SharedMemoryManager* memoryManager);
~Recorder();
void recordBuffer(AT_U8 *buffer, int bufsize);
int setupRecording(string filename0, string filename1, bool open0, bool open1);
void startRecording();
void stopRecording();
int testWriteSpeed(string directoryPath, string filename);
void insertFrameItem(AT_U8* buffer, int bufSize, int chunkID);
private:
FILE *chunk0, *chunk1;
string chunkFilename0, chunkFilename1;
int frameCounter0, frameCounter1;
bool writes0, writes1;
int bufSize0, bufSize1;
static SharedMemoryManager* manager;
bool isRecording;
promise<int> prom0;
promise<int> prom1;
thread* recordingThread0;
thread* recordingThread1;
static void performRecording(promise<int>* exitCode, int chunkIdentifier);
void writeNextItem(int chunkIdentifier);
void closeFiles();
};
#endif //!__RECORDER__
// File: Recorder.cpp
#include "Recorder.h"
#include <ctime>
#include <iostream>
using namespace std;
Recorder* recorderInstance; // keep a pointer to the current instance, for accessing static functions from (non-static) objects in the threads
SharedMemoryManager* Recorder::manager; // the same reason
...
void Recorder::startRecording(){
if (isRecording == false){ // do not start new threads if some are still running
isRecording = true;
if (writes0==true) recordingThread0 = new thread(&Recorder::performRecording, &prom0, 0);
if (writes1==true) recordingThread1 = new thread(&Recorder::performRecording, &prom1, 1);
}
}
void Recorder::writeNextItem(int chunkIdentifier){
FILE* chunk;
AT_U8* buffer;
int* bufSize;
if (chunkIdentifier==0){
chunk = chunk0;
bufSize = &bufSize0;
buffer = manager->getNextFrameForRecording(0);
}
else {
chunk = chunk1;
bufSize = &bufSize1;
buffer = manager->getNextFrameForRecording(1);
}
size_t nbytes = fwrite(buffer, 1, (*bufSize)*sizeof(unsigned char), chunk);
if (nbytes<=0){
cout << "No data were written to file." << endl;
}
manager->notifyRecordedFrame(buffer,chunkIdentifier);
if (chunkIdentifier==0) frameCounter0++;
else frameCounter1++;
}
void Recorder::performRecording(promise<int>* exitCode, int chunkIdentifier){
bool flag = true;
int remaining = manager->hasFramesForRecording(chunkIdentifier);
while( recorderInstance->isRecording==true || remaining>0 ){
if (remaining>0){
if (recorderInstance->isRecording==false){
cout << "Acquisition stopped, still " << remaining << " frames are to be recorded in chunk " << chunkIdentifier << endl;
}
recorderInstance->writeNextItem(chunkIdentifier);
}
else{
this_thread::sleep_for(chrono::milliseconds(10));
}
remaining = manager->hasFramesForRecording(chunkIdentifier);
}
cout << "Done recording." << endl;
}
在您显示的 Windows 内存使用屏幕截图中,最大块 (45GB) 是 "cached",其中 27GB 是 "modified",即 "dirty pages waiting to be written to disk"。这是正常行为,因为您的写入速度快于磁盘 I/O 可以跟上的速度。 flush/fflush 对此没有影响,因为它不在您的进程中。如您所见:"the memory used by my application remains constant the whole time"。别担心。但是,如果您真的不希望OS缓冲脏输出页面,请考虑使用Windows上可用的"unbuffered I/O",因为它会写通过立即到磁盘。
编辑:一些指向 Windows 上无缓冲 I/O 的链接。请注意,无缓冲 I/O 对您的读取和写入设置了内存对齐限制。
我正在开发一个 c++ 应用程序(在 VS2012 中,Windows Server 2012 R2),它将大量二进制数据从已分配的循环缓冲区数组写入原始文件。问题是 Windows 任务管理器报告的系统 RAM 使用率随着 fwrite 将数据写入文件而线性增加,直到达到几乎保持不变的某个点(另请参见下图)。此外,我的应用程序使用的内存始终保持不变。
我定期调用fflush,但没有效果。虽然这似乎是一个无害的案例,但我在性能方面担心这个问题,因为另一个 Java 应用程序也将 运行 在名义上运行。
因此,我想问一下我是否应该担心这个问题,以及是否有办法避免这个问题,以实现实时数据记录系统的最佳性能。
针对 linux 操作系统 here and here 提出了类似的问题,据说只要有足够的可用内存,系统就可以投入一定数量的内存来缓存数据.
接下来介绍申请的一部分。简而言之,应用程序控制一对相机,每个相机都获取帧并将它们存储在正确对齐的分配缓冲区中。有 i) 一个 CameraInterface class,它创建两个 "producer" 线程,ii) 一个 Recorder class,它创建两个 "consumer" 线程和 iii) 一个 SharedMemoryManager class 它为生产者提供了一个可用的缓冲区来存储数据,并为消费者提供了下一个要写入文件的缓冲区。 SharedMemoryManager 包含两个缓冲区数组(每对生产者-消费者一个)和两个相应的标志数组,用于指示缓冲区的状态。它还包含两个 std::queue 对象,用于快速访问下一个要记录的缓冲区。接下来显示 Recorder 和 SharedMemoryManager 的部分。
// somewhere in file "atcore.h"...
typedef unsigned char AT_U8;
// File: SharedMemoryManager.h
#ifndef __MEM_MANAGER__
#define __MEM_MANAGER__
#pragma once
#include "atcore.h"
#include <queue>
#include <mutex>
#define NBUFFERS 128
#define BUFFER_AVAILABLE 0
#define BUFFER_QUEUED 1
#define BUFFER_FULL 2
#define BUFFER_RECORDING_PENDING 3
// the status flag cycle is
// EMPTY -> QUEUED -> FULL -> RECORDING_PENDING -> EMPTY
using namespace std;
typedef struct{
AT_U8** buffers;
int* flags;
int acquiredCounter;
int consumedCounter;
int queuedCounter;
mutex flagMtx;
} sharedMemory;
typedef struct{
AT_U8* buffer;
int bufSize;
int index;
} record;
class SharedMemoryManager
{
public:
SharedMemoryManager();
~SharedMemoryManager(void);
void enableRecording();
void disableRecording();
int setupMemory(int cameraIdentifier, int bufferSize);
void freeMemory();
void freeCameraMemory(int cameraIdentifier);
int getBufferSize(int cameraIdentifier);
AT_U8* getBufferForCameraQueue(int cameraIdentifier); // get pointer to the next available buffer for queueing in the camera
int hasFramesForRecording(int cameraIdentifier); // ask how many frames for recording are there in the respective queue
AT_U8* getNextFrameForRecording(int cameraIdentifier); // get pointer to the next buffer to be recorded to a file
void copyMostRecentFrame(unsigned char* buffer, int cameraIdentifier); // TODO // get a copy of the most recent frame on the buffer
void notifyAcquiredFrame(AT_U8* buffer, int bufSize, int cameraIdentifier); // use this function to notify the manager that the buffer has just been filled with data
void notifyRecordedFrame(AT_U8* buffer, int cameraIdentifier); // use this function to notify the manager that the buffer has just been written to file and can be used again
private:
bool useMem0, useMem1;
int bufSize0, bufSize1;
sharedMemory* memory0;
sharedMemory* memory1;
queue<record*> framesForRecording0;
queue<record*> framesForRecording1;
bool isRecording;
int allocateBuffers(sharedMemory* mem, int bufSize);
void freeBufferArray(sharedMemory* mem);
};
#endif // !__MEM_MANAGER
// File: SharedMemoryManager.cpp
...
int SharedMemoryManager::hasFramesForRecording(int cameraIdentifier){
if (cameraIdentifier!=0 && cameraIdentifier!=1){
cout << "Could not get the number of frames in the shared memory. Invalid camera id " << cameraIdentifier << endl;
return -1;
}
if (cameraIdentifier==0){
return (int)framesForRecording0.size();
}
else{
return (int)framesForRecording1.size();
}
}
AT_U8* SharedMemoryManager::getNextFrameForRecording(int cameraIdentifier){
if (cameraIdentifier!=0 && cameraIdentifier!=1){
cout << "Error in getNextFrameForRecording. Invalid camera id " << cameraIdentifier << endl;
return NULL;
}
sharedMemory* mem;
if (cameraIdentifier==0) mem=memory0;
else mem=memory1;
queue<record*>* framesQueuePtr;
if (cameraIdentifier==0) framesQueuePtr = &framesForRecording0;
else framesQueuePtr = &framesForRecording1;
if (framesQueuePtr->empty()){ // no frames to be recorded at the moment
return NULL;
}
record* item;
int idx;
AT_U8* buffer = NULL;
item = framesQueuePtr->front();
framesQueuePtr->pop();
idx = item->index;
delete item;
mem->flagMtx.lock();
if (mem->flags[idx] == BUFFER_FULL){
mem->flags[idx] = BUFFER_RECORDING_PENDING;
buffer = mem->buffers[idx];
}
else{
cout << "PROBLEM. Buffer in getBufferForRecording. Buffer flag is " << mem->flags[idx] << endl;
cout << "----- BUFFER FLAGS -----" << endl;
for (int i=0; i<NBUFFERS; i++){
cout << "[" << i << "] " << mem->flags[i] << endl;
}
cout << "----- -----" << endl;
}
mem->flagMtx.unlock();
return buffer;
}
int SharedMemoryManager::allocateBuffers(sharedMemory* mem, int bufSize){
// allocate the array for the buffers
mem->buffers = (AT_U8**)calloc(NBUFFERS,sizeof(AT_U8*));
if (mem->buffers==NULL){
cout << "Could not allocate array of buffers." << endl;
return -1;
}
// allocate the array for the respective flags
mem->flags = (int*)malloc(NBUFFERS*sizeof(int));
if (mem->flags==NULL){
cout << "Could not allocate array of flags for the buffers." << endl;
free(mem->buffers);
return -1;
}
int i;
for (i=0; i<NBUFFERS; i++){ // allocate the buffers
mem->buffers[i] = (AT_U8*)_aligned_malloc((size_t)bufSize,8);
if (mem->buffers[i] == NULL){
cout << "Could not allocate memory for buffer no. " << i << endl;
for (int j=0; j<i; j++){ // free the previously allocated buffers
_aligned_free(mem->buffers[j]);
}
free(mem->buffers);
free(mem->flags);
return -1;
}
else{
mem->flags[i]=BUFFER_AVAILABLE;
}
}
return 0;
}
void SharedMemoryManager::freeBufferArray(sharedMemory* mem){
if (mem!=NULL){
for(int i=0; i<NBUFFERS; i++){
_aligned_free(mem->buffers[i]);
mem->buffers[i]=NULL;
}
free(mem->buffers);
mem->buffers = NULL;
free(mem->flags);
mem->flags = NULL;
free(mem);
mem = NULL;
}
}
// File: Recorder.h
#ifndef __RECORDER__
#define __RECORDER__
#pragma once
#include <string>
#include <queue>
#include <future>
#include <thread>
#include "atcore.h"
#include "SharedMemoryManager.h"
using namespace std;
class Recorder
{
public:
Recorder(SharedMemoryManager* memoryManager);
~Recorder();
void recordBuffer(AT_U8 *buffer, int bufsize);
int setupRecording(string filename0, string filename1, bool open0, bool open1);
void startRecording();
void stopRecording();
int testWriteSpeed(string directoryPath, string filename);
void insertFrameItem(AT_U8* buffer, int bufSize, int chunkID);
private:
FILE *chunk0, *chunk1;
string chunkFilename0, chunkFilename1;
int frameCounter0, frameCounter1;
bool writes0, writes1;
int bufSize0, bufSize1;
static SharedMemoryManager* manager;
bool isRecording;
promise<int> prom0;
promise<int> prom1;
thread* recordingThread0;
thread* recordingThread1;
static void performRecording(promise<int>* exitCode, int chunkIdentifier);
void writeNextItem(int chunkIdentifier);
void closeFiles();
};
#endif //!__RECORDER__
// File: Recorder.cpp
#include "Recorder.h"
#include <ctime>
#include <iostream>
using namespace std;
Recorder* recorderInstance; // keep a pointer to the current instance, for accessing static functions from (non-static) objects in the threads
SharedMemoryManager* Recorder::manager; // the same reason
...
void Recorder::startRecording(){
if (isRecording == false){ // do not start new threads if some are still running
isRecording = true;
if (writes0==true) recordingThread0 = new thread(&Recorder::performRecording, &prom0, 0);
if (writes1==true) recordingThread1 = new thread(&Recorder::performRecording, &prom1, 1);
}
}
void Recorder::writeNextItem(int chunkIdentifier){
FILE* chunk;
AT_U8* buffer;
int* bufSize;
if (chunkIdentifier==0){
chunk = chunk0;
bufSize = &bufSize0;
buffer = manager->getNextFrameForRecording(0);
}
else {
chunk = chunk1;
bufSize = &bufSize1;
buffer = manager->getNextFrameForRecording(1);
}
size_t nbytes = fwrite(buffer, 1, (*bufSize)*sizeof(unsigned char), chunk);
if (nbytes<=0){
cout << "No data were written to file." << endl;
}
manager->notifyRecordedFrame(buffer,chunkIdentifier);
if (chunkIdentifier==0) frameCounter0++;
else frameCounter1++;
}
void Recorder::performRecording(promise<int>* exitCode, int chunkIdentifier){
bool flag = true;
int remaining = manager->hasFramesForRecording(chunkIdentifier);
while( recorderInstance->isRecording==true || remaining>0 ){
if (remaining>0){
if (recorderInstance->isRecording==false){
cout << "Acquisition stopped, still " << remaining << " frames are to be recorded in chunk " << chunkIdentifier << endl;
}
recorderInstance->writeNextItem(chunkIdentifier);
}
else{
this_thread::sleep_for(chrono::milliseconds(10));
}
remaining = manager->hasFramesForRecording(chunkIdentifier);
}
cout << "Done recording." << endl;
}
在您显示的 Windows 内存使用屏幕截图中,最大块 (45GB) 是 "cached",其中 27GB 是 "modified",即 "dirty pages waiting to be written to disk"。这是正常行为,因为您的写入速度快于磁盘 I/O 可以跟上的速度。 flush/fflush 对此没有影响,因为它不在您的进程中。如您所见:"the memory used by my application remains constant the whole time"。别担心。但是,如果您真的不希望OS缓冲脏输出页面,请考虑使用Windows上可用的"unbuffered I/O",因为它会写通过立即到磁盘。
编辑:一些指向 Windows 上无缓冲 I/O 的链接。请注意,无缓冲 I/O 对您的读取和写入设置了内存对齐限制。