Nodejs, Cloud Firestore Upload Tasks - Auth error:Error: socket hang up

Nodejs, Cloud Firestore Upload Tasks - Auth error:Error: socket hang up

我正在编写一个函数,该函数 运行s API 通过偏移量从一个巨大的数据库中按顺序调用和请求 JSON。解析 JSON 响应,然后将其中的后续数据上传到我们的 Cloud Firestore 服务器。

Nodejs (Node 6.11.3) & 最新的 Firebase Admin SDK

信息按预期解析,完美打印到控制台。但是,当数据尝试上传到我们的 Firestore 数据库时,控制台会收到错误消息:

Auth error:Error: socket hang up

(node:846) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: -Number-): Error: Getting metadata from plugin failed with error: socket hang up

偶尔:

Auth error:Error: read ECONNRESET

forEach 函数从下载的 JSON 中收集项目并在上传到 Firestore 数据库之前处理数据。每个 JSON 最多有 1000 条数据(相当于 1000 个文档)要通过 forEach 函数。我知道如果该功能在上传集完成之前重复,这可能是个问题?

我是一个编码新手,我知道这个函数的控制流不是最好的。但是,我找不到有关控制台打印的错误的任何信息。我可以找到很多关于套接字挂起的信息,但是 none 在 Auth 错误部分。

我正在使用生成的服务帐户 JSON 作为访问我们数据库的凭据,该数据库使用 firebase-adminsdk 帐户。我们的 read/write 数据库规则目前是开放的,允许任何访问(因为我们正在开发中,没有真正的用户)。

这是我的函数:

Firebase 初始化和偏移清零

 const admin = require('firebase-admin');
    var serviceAccount = require("JSON");
    admin.initializeApp({
    credential: admin.credential.cert(serviceAccount),
    databaseURL: "URL"
    });
    var db = admin.firestore();
    var offset = 0;
    var failed = false;

运行函数&设置HTTP Headers

var runFunction = function runFunction() {
    var https = require('https');
    var options = {
        host: 'website.com',
        path: (path including an offset and 1000 row specifier),
        method: 'GET',
        json: true,
        headers: {
            'content-type': 'application/json',
            'Authorization': 'Basic ' + new Buffer('username' + ':' + 'password').toString('base64')
        }
    };

运行 HTTP 请求并重新运行 函数,如果我们还没有到达 API[=61 的响应末尾=]

if (failed === false) {
        var req = https.request(options, function (res) {
            var body = '';
            res.setEncoding('utf8');
            res.on('data', function (chunk) {
                body += chunk;
            });
            res.on('end', () => {
                console.log('Successfully processed HTTPS response');
                body = JSON.parse(body);
                if (body.hasOwnProperty('errors')) {
                    console.log('Body ->' + body)
                    console.log('API Call failed due to server error')
                    console.log('Function failed at ' + offset)
                    req.end();
                    return
                } else {
                    if (body.hasOwnProperty('result')) {
                        let result = body.result;
                        if (Object.keys(result).length === 0) {
                            console.log('Function has completed');
                            failed = true;
                            return;
                        } else {
                            result.forEach(function (item) {
                                var docRef = db.collection('collection').doc(name);
                                console.log(name);
                                var upload = docRef.set({
                                    thing: data,
                                    thing2: data,
                                })
                            });
                            console.log('Finished offset ' + offset)
                            offset = offset + 1000;
                            failed = false;
                        }
                        if (failed === false) {
                            console.log('Function will repeat with new offset');
                            console.log('offset = ' + offset);
                            req.end();
                            runFunction();
                        } else {
                            console.log('Function will terminate');
                        }
                    }
                }
            });
        });
        req.on('error', (err) => {
            console.log('Error -> ' + err)
            console.log('Function failed at ' + offset)
            console.log('Repeat from the given offset value or diagnose further')
            req.end();
        });
        req.end();
    } else {
        req.end();
    }
    };
    runFunction();

如有任何帮助,我们将不胜感激!

更新

我刚刚尝试更改我一次提取的 JSON 的行,然后使用该函数一次上传 - 从 1000 减少到 100。套接字挂起错误不太频繁所以肯定是数据库超载了。

理想情况下,如果每个 forEach 数组迭代都等待前一个迭代完成后再开始。

更新 #2

我已经安装了异步模块,我目前正在使用 async.eachSeries 函数一次执行一个文档上传。上传过程中的所有错误都会消失——但是该功能将花费大量时间才能完成(158,000 个文档大约需要 9 个小时)。我更新的循环代码是这样的,实现了一个计数器:

async.eachSeries(result, function (item, callback) {
    // result.forEach(function (item) {
    var docRef = db.collection('collection').doc(name);
    console.log(name);
    var upload = docRef.set({
      thing: data,
      thing2: data,
    }, { merge: true }).then(ref => {
        counter = counter + 1
        if (counter == result.length) {
            console.log('Finished offset ' + offset)
            offset = offset + 1000;
            console.log('Function will repeat with new offset')
            console.log('offset = ' + offset);
            failed = false;
            counter = 0
            req.end();
            runFunction();
        }
        callback()
    });
});

还有,一段时间后数据库returns出现这个错误:

(node:16168) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: -Number-): Error: The datastore operation timed out, or the data was temporarily unavailable.

好像现在我的函数花费的时间太长了...而不是不够长。有没有人对如何使这个 运行 更快而没有规定的错误有任何建议?

作为此循环一部分的写入请求只是超出了 Firestore 的配额 - 因此服务器拒绝了其中的大部分请求。

为了解决这个问题,我将我的请求转换为一次上传大约 50 个项目的块,并使用 Promises 确认何时进行下一个块上传。

答案是post在这里 -> ,我的工作代码模板如下:

async function uploadData(dataArray) {
  try {
    const chunks = chunkArray(dataArray, 50);
    for (const [index, chunk] of chunks.entries()) {
      console.log(` --- Uploading ${index + 1} chunk started ---`);
      await uploadDataChunk(chunk);
      console.log(`---Uploading ${index + 1} chunk finished ---`);
    }
  } catch (error) {
    console.log(error)
    // Catch en error here
  }
}

function uploadDataChunk(chunk) {
  return Promise.all(
    chunk.map((item) => new Promise((resolve, reject) => {
      setTimeout(
        () => {
          console.log(`Chunk item ${item} uploaded`);
          resolve();
        },
        Math.floor(Math.random() * 500)
      );
    }))
  );
}

function chunkArray(array, chunkSize) {
  return Array.from(
    { length: Math.ceil(array.length / chunkSize) },
    (_, index) => array.slice(index * chunkSize, (index + 1) * chunkSize)
  );
}

将数据数组传递给 uploadData - 使用 uploadData(data); post 将每个项目的上传代码上传到 chunk.map 函数中 setTimeout 块内的 uploadDataChunk(在 resolve() 行之前)。

我在 之前解决了这个问题,每次之间等待 50 毫秒。

function Wait() {
    return new Promise(r => setTimeout(r, 50))
}

function writeDataToFirestoreParentPhones(data) {
    let chain = Promise.resolve();
    for (let i = 0; i < data.length; ++i) {
        var docRef = db.collection('parent_phones').doc(data[i].kp_ID_for_Realm);
        chain = chain.then(()=> {
            var setAda = docRef.set({
                parent_id: data[i].kf_ParentID,
                contact_number: data[i].contact_number,
                contact_type: data[i].contact_type
            }).then(ref => {
                console.log(i + ' - Added parent_phones with ID: ', data[i].kp_ID_for_Realm);
            }).catch(function(error) {
                console.error("Error writing document: ", error);
            });
        })
        .then(Wait)
    }
}

对我来说这是一个网络问题。

以前和今天使用 public 较慢的 wifi 连接,以 10,000 个批次上传 180,000 个文档对我来说没有问题,我收到了那个错误。

切换回我的 4G 移动连接解决了我的问题。不确定这是否是速度问题 - 可能是安全问题 - 但我会接受这个假设。