运行 块中的 HTTP 请求

Run HTTP requests in chunks

我想 运行 1 在可配置的块中发送 HTTP 请求,并在块请求之间设置可配置的超时。该请求基于 some.csv 文件提供的数据。

它不起作用,因为我得到一个 TypeError,但是当我在 f 之后删除 () 时,它也不起作用。 如果能提供一点帮助,我将不胜感激。可能最大的问题是我并不真正理解 promises 究竟是如何工作的,但我尝试了多种解决方案,但我无法实现我想要的。

超时功能可能会让我更加头疼,所以如果有任何提示,我将不胜感激。

你能帮我理解为什么它不起作用吗?

这是片段:

const rp = require('request-promise');
const fs = require('fs');
const { chunk } = require('lodash');

const BATCH_SIZE = 2;
const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue'];

async function update(id, time, query) {
    const options = {
        method: 'POST',
        uri: `https://requesturl/${id}?query=${query}`,
        body: {
            "prop": {
                "time": time
            }
        },
        headers: {
            "Content-Type": "application/json"
        },
        json: true
    }

    return async () => { return await rp(options) };
}

async function batchRequestRunner(data) {
    const promises = [];
    for (row of data) {
        row = row.split(',');
        promises.push(update(row[0], row[1], QUERY_PARAMS.join(',')));
    }
    const batches = chunk(promises, BATCH_SIZE);

    for (let batch of batches) {
        try {
            Promise.all(
                batch.map(async f => { return await f();})
            ).then((resp) => console.log(resp));
        } catch (e) {
            console.log(e);
        }
    }
}

async function main() {
    const input = fs.readFileSync('./input.test.csv').toString().split("\n");
    const requestData = input.slice(1);

    await batchRequestRunner(requestData);
}

main();

对第一条评论的澄清:

我有一个如下所示的 csv 文件:

clientId,startTime
123,13:40:00
321,13:50:00

文件大小约为 100k 行 该文件包含如何更新数据库中特定 clientId 的时间的信息。我无权访问数据库,但我可以访问 API,它允许更新数据库中的条目。 我不能一次拨打 100k 个电话,因为:我的网络有限(我因为冠状病毒而远程工作),它消耗了大量内存,并且 API 也可能受到限制,如果我发出所有请求可能会崩溃立刻。

我想达到的目标:

好吧,这似乎是一个有点经典的案例,您希望通过一些异步操作处理一组值,并避免消耗太多资源或使目标服务器不堪重负,您希望拥有的不超过N 个请求同时在飞行中。这是一个常见问题,有针对它的预建解决方案。我的 goto 解决方案是一小段名为 mapConcurrent() 的代码。它与 array.map() 类似,但它假设有一个返回承诺的异步回调,并且您向它传递了应该同时运行的最大项目数。然后它 returns 给你一个解析为结果数组的承诺。

这里是mapConcurrent()

// takes an array of items and a function that returns a promise
// returns a promise that resolves to an array of results
function mapConcurrent(items, maxConcurrent, fn) {
    let index = 0;
    let inFlightCntr = 0;
    let doneCntr = 0;
    let results = new Array(items.length);
    let stop = false;

    return new Promise(function(resolve, reject) {

        function runNext() {
            let i = index;
            ++inFlightCntr;
            fn(items[index], index++).then(function(val) {
                ++doneCntr;
                --inFlightCntr;
                results[i] = val;
                run();
            }, function(err) {
                // set flag so we don't launch any more requests
                stop = true;
                reject(err);
            });
        }

        function run() {
            // launch as many as we're allowed to
            while (!stop && inflightCntr < maxConcurrent && index < items.length) {
                runNext();
            }
            // if all are done, then resolve parent promise with results
            if (doneCntr === items.length) {
                resolve(results);
            }
        }

        run();
    });
}

然后您的代码可以被构造为像这样使用它:

function update(id, time, query) {
    const options = {
        method: 'POST',
        uri: `https://requesturl/${id}?query=${query}`,
        body: {
            "prop": {
                "time": time
            }
        },
        headers: {
            "Content-Type": "application/json"
        },
        json: true
    }
    return rp(options);
}

function processRow(row) {
    let rowData = row.split(",");
    return update(rowData[0], rowData[1], rowData[2]);
}


function main() {
    const input = fs.readFileSync('./input.test.csv').toString().split("\n");
    const requestData = input.slice(1);    

    // process this entire array with up to 5 requests "in-flight" at the same time
    mapConcurrent(requestData, 5, processRow).then(results => {
        console.log(results);
    }).catch(err => {
        console.log(err);
    });
}

您显然可以将并发请求数调整为您想要的任何数量。我在这个例子中将它设置为 5。