Node.js 堆溢出错误

Node.js Heap Overflow error

运行 使用此代码

1-2 天后,我在 AWS EC2 上收到此错误

错误

<--- Last few GCs --->
st[10805:0x41cdff0]  7130379 ms: Mark-sweep 33.2 (78.7) -> 21.1 (75.8) MB, 13.8 / 0.1 ms  (+ 23.1 ms in 23 steps since start of marking, biggest step 4.3 ms, walltime since start of marking 160 ms) final$

<--- JS stacktrace --->
Cannot get stack trace in GC.
FATAL ERROR: Scavenger: promoting marked
Allocation failed - process out of memory
1: node::Abort() [node]
2: 0x12b288c [node]
3: v8::Utils::ReportOOMFailure(char const*, bool) [node]
4: v8::internal::V8::FatalProcessOutOfMemory(char const*, bool) [node]
5: 0xa96bfb [node]
6: void v8::internal::ScavengingVisitor<(v8::internal::MarksHandling)0, 
(v8::internal::PromotionMode)0, (v8::internal::LoggingAndProfiling)1>::EvacuateObject<(v8::internal::ScavengingVisitor<(v8::intern$
 7: v8::internal::Scavenger::ScavengeObject(v8::internal::HeapObject**, v8::internal::HeapObject*) [node]
 8: v8::internal::Heap::IteratePromotedObjectPointers(v8::internal::HeapObject*, unsigned char*, unsigned char*, bool, void (*)(v8::internal::HeapObject**, v8::internal::HeapObject*)) [node]
 9: void v8::internal::BodyDescriptorBase::IterateBodyImpl<v8::internal::ObjectVisitor>(v8::internal::HeapObject*, int, int, v8::internal::ObjectVisitor*) [node]
10: void v8::internal::BodyDescriptorApply<v8::internal::CallIterateBody, void, v8::internal::HeapObject*, int, v8::internal::ObjectVisitor*>(v8::internal::InstanceType, v8::internal::HeapObject*, int, v$
11: v8::internal::Heap::DoScavenge(v8::internal::ObjectVisitor*, unsigned char*, v8::internal::PromotionMode) [node]
12: v8::internal::Heap::Scavenge() [node]
13: v8::internal::Heap::PerformGarbageCollection(v8::internal::GarbageCollector, v8::GCCallbackFlags) [node]
14: v8::internal::Heap::CollectGarbage(v8::internal::GarbageCollector, v8::internal::GarbageCollectionReason, char const*, v8::GCCallbackFlags) [node]
15: v8::internal::Factory::NewRawTwoByteString(int, v8::internal::PretenureFlag) [node]
16: v8::internal::Factory::NewStringFromUtf8(v8::internal::Vector<char const>, v8::internal::PretenureFlag) [node]
17: v8::String::NewFromUtf8(v8::Isolate*, char const*, v8::String::NewStringType, int) [node]
18: node::StringBytes::Encode(v8::Isolate*, char const*, unsigned long, node::encoding) [node]
19: void node::Buffer::StringSlice<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&) [node]
20: 0x33c699f18dcf

我的 Main 函数是一个异步 while 循环,看起来像这样,这是一个快速路由的控制器函数

function controller(cb) {
  return new Promise((resolve, reject) => {
    let killed = false;
    (async() => {
      let isEmpty = false;
      while (!killed && !isEmpty) {
        const code = await processBatch();
        if (code === EMPTY_QUEUE) {
          isEmpty = true;
          console.log('ss');
          resolve(false);
        }
      }
    })();
    cb()
      .then((state) => killed = state);
  });
}

在这里,processBatch() 可能需要大约 10 秒来解决承诺

注意:processBatch 永远不会 return EMPTY_QUEUE 并且 killed 永远不会被回调设置为 true

考虑到这一点,有人能告诉我为什么这个控制器函数在一段时间后消耗了这么多内存,我是在做一些停止节点垃圾收集数据的事情还是类似的事情?

-- 更新 --

这是调用控制器功能的路由器代码,确保同一时间不超过一个控制器工作

const query = require('../controllers/fetchContent').query;
const controller = require('../../storage/controllers/index').controller;

let isFetching = false;
let killed = false;

function killSwitch () {
 return new Promise((resolve, reject) => {
     setInterval(() => {
        if(killed) {
            resolve(killed);
        }
    }, 10000);
 })
}
module.exports = (app) => {
 app.get('/api', (req, res) => {
    res.setHeader('Content-Type', 'application/json');
    res.json({"statusCode" : 200, "body" : "Hey"})
});
 app.post('/', (req, res) => {
    if(!killed) {
        if (!isFetching) {
            isFetching = true;
            controller(killSwitch)
                    .then((response) => {
                        isFetching = response.isFetching;
                    });
            res.send({
                success: true,
                message: 'Okay I will extract send the contents to the database'
            })
        } else {
            res.send({
                success: true,
                message: 'Already Fetching'
            })
        }
    } else {
        res.send({
            success: false,
            message: 'In killed State, start to continue'
        })
    }
});
 app.post('/kill', (req, res) => {
    killed = true;
    isFetching = false;
    res.send(200, 'Okay I have stopped the fetcher process')
});
 app.post('/alive', (req, res) => {
    killed = false;
    res.send({
        success: true,
        message: 'Now New req to / will be entertained'
    })
 });
  app.post('/api/fetch', query);
};

-- 更新 2 --

这是processBatch()函数,它的作用是从Amazon SQS获取数据,处理后将数据发送到另一个Amazon SQS,并通过Amazon SNS通知订阅者。

async function processBatch() {
let data = await getDataFromQueue();// Wait for the promise returned after messages are retrieved from the Queue.
let listOfReceipt = [];
if (q.length() > 50 ) {
  // if queue length is more than 50 then wait for queue to process previous data ( done in order to put a max cap on queue size )   
    await sleep(400);
    console.log(q.length());
    return CLEAN_EXIT;
}
//Also get the ReceiptHandles for those messages. (To be used for deletion later on)

if (!data.Messages || !data.Messages.length) {
    pushSNS(null, true);
    pushDelete(null, true);
    return EMPTY_QUEUE;
}
try {
    for (let i = 0; i < data.Messages.length; i++) {
        data.Messages[i].Body = JSON.parse(data.Messages[i].Body);
        const URL = data.Messages[i].Body.url;
        const identifier = data.Messages[i].Body.identifier;
        listOfReceipt.push(data.Messages[i].ReceiptHandle);// get the ReceiptHandle out of the message.
        q.push(URL, async (err, html) => {
            if (err) {
                console.log(err);
            } else {
                await sendDataToQueue({url: URL, content: html, identifier});
                pushDelete(data.Messages[i].ReceiptHandle);
                pushSNS();
            }
        });
    }
} catch (e) {
    console.log(e);
    pushSNS(null, true);
    pushDelete(null, true);
    return CLEAN_EXIT;
// simply ignore any error and delete that msg
 }
 return CLEAN_EXIT;
}

这里的q是Async.queue,它是辅助函数,即extractContent的作用是获取提供的URL.

的内容

这个模块有辅助函数。

const q = async.queue((URL, cb) => {
extractContent(URL, array)
        .then((html) => {
            cb(null,html);
        })
        .catch((e) => {
            cb(e);
        })
 }, concurrency);


function internalQueue(cb) {
    let arr = [];
    return function (message, flag) {
       arr.push(message);
       if(arr.length >= 10 || flag) {
          arr = [];
          cb();
       }
    }
}

function sleep (delay) {
 return new Promise ((resolve, reject) => {
    setTimeout(() => resolve(), delay)
  })
}
// this is done in order to do things in a batch, this reduces cost
let pushSNS = internalQueue(sendDataToSNS);
let pushDelete = internalQueue(deleteDataFromSQS);

首先,您的 controller 函数 return 是一个 Promise,根据您的声明,processBatch 永远不会 return EMPTY_QUEUE。我假设您将 returned Promise 存储在某处,并且每个 Promise 都会消耗内存。

此外,每次调用 controller 函数时,它都会创建一个无限调用 processBatch 的新循环。因此,如果 controller 是快速路由 的控制器函数,那么每次有人请求该路由时,您都会创建一个无限调用 processBatch 的新循环。我敢打赌这不是我们想要的行为,它肯定会占用大量内存。

因新细节而更新:

目前,如果有人 POST /kill,然后 POST 立即“/alive”,她将能够 POST 在 / 并在 controller 开始另一个循环,因为 processBatch 可能需要大约 10 秒来解决承诺 .这样,如果有人会重复 POSTs 到 /kill -> /alive -> /,她将有效地 DoS 您的应用。大概就是这样吧。

另一个更新

此代码 q.push(URL, async (err, html) => { 启动一个新查询并附加一个应在查询完成后调用的回调。 q 计数器在调用回调之前减少。但回调是异步的 (async) 并且它执行另一个查询 await sendDataToQueue({url: URL, content: html, identifier});.

如您所见,如果 sendDataToQueue 执行速度比 q 慢,则回调会累积并消耗内存。