无法通过 setTimeout 解决承诺

Unable to resolve a promise with setTimeout

我正在编写一个函数来使用 Node.js 写入 AWS Kinesis firehose。根据操作结果写入 firehose 流 returns "error" 或 "data" 的 AWS 函数调用。如果出现错误,对于特定的错误代码,我需要使用 exponentialBackoff 重试相同的请求。我正在使用 setTimeOut 为每次后续重试触发相同的方法,但每次重试的时间都不同,但看起来每当有重试时,我都没有正确解决并且我的测试失败抱怨“错误:超时超过 5000 毫秒. 对于异步测试和挂钩,确保调用 "done()";如果返回 Promise,确保它已解析。"

async function batchWrite(records,firehose,retry = 0){
    var readingObjects = getReadings(records);
    var params = {
        DeliveryStreamName: process.env.KINESIS_FIREHOSE_STREAM_DELIVERY,
        Records: readingObjects
    };

    return await new Promise(function(resolve,reject){
        firehose.putRecordBatch(params,function(error,data){
            if(error){
                if(error.code == 'ServiceUnavailableException' && retry < retries.length){
                    console.log('retryCount=',retry);
                    setTimeout(batchWrite,retries[retry],records,firehose,retry+1);
                    console.log('setTimeout',retry);
                }
                else{
                    // console.log('Error',error);
                    console.log('resolving');
                    resolve(error);
                }
            }
            else{
                if(data.FailedPutCount > 0){
                    //colect the RequestResponses which are not processed.
                    //index of those records is same as in request
                    //process those faied records again
                    console.log('Error',data);
                }
                resolve(data);
            }
        });
    });
}

对于上面的函数,在测试用例中,我通过返回具有 ServiceUnavailableException 的响应来模拟错误情况,以便它重试,但在所有重试之后,测试超时。我需要更改什么才能使其正常工作吗?

当您调用 setTimeout() 进行重试时,您创建并返回的原始承诺永远不会得到解决。因此,调用者永远不会看到已解决的承诺。您需要将新的承诺链接到先前的承诺。而且,您不能直接使用 setTimeout() 来做到这一点,但是您可以这样做,我将其包装在自己的承诺中,然后将其链接到原始承诺。此外,如果您将 putRecordBatch 承诺为最低级别并使用承诺而不是承诺和回调的混合来执行所有控制流,则代码更易于管理。

你可以这样做:

// utility function to return a promise that is resolved after a setTimeout()
function delay(t, v) {
    return new Promise(resolve => {
        setTimeout(resolve, t, v);
    });
}

const promisify = require('util').promisify;

function batchWrite(records,firehose,retry = 0){
    var readingObjects = getReadings(records);
    var params = {
        DeliveryStreamName: process.env.KINESIS_FIREHOSE_STREAM_DELIVERY,
        Records: readingObjects
    };

    // make a promisified version of firehose.putRecordBatch
    firehose.putRecordBatchP = promisify(firehose.putRecordBatch);

    return firehose.putRecordBatchP(params).then(data => {
        if(data.FailedPutCount > 0){
            //collect the RequestResponses which are not processed.
            //index of those records is same as in request
            //process those failed records again
            console.log('Error',data);
        }
        return data;

    }).catch(error => {
        if(error.code == 'ServiceUnavailableException' && retry < retries.length){
            console.log('retryCount=',retry);

            // CHAIN the promise delay and retry here
            return delay(retries[retry]).then(() => {
                console.log('retry after delay #',retry);
                return batchWrite(records,firehose,retry+1); 
            });
        } else {
            // REJECT promise here upon error
            console.log('Error, rejecting promise');
            throw error;
        }

    });
}

关于您的代码,我有几处我不明白:

  1. 当您从 putRecordBatch() 收到不可重试的错误时,您似乎应该拒绝承诺,以便调用者知道操作失败。我更改了代码来做到这一点。

  2. 我不清楚 if(data.FailedPutCount > 0){...} 子句的用途或者为什么你所做的只是 console.log()