Node-Express 一次可以触发多少个请求?

How many requests can Node-Express fire off at once?

我有一个脚本可以从 AWS Athena 中提取 25,000 条记录,这基本上是一个 PrestoDB 关系型 SQL 数据库。假设我正在为这些记录中的每一条生成一个请求,这意味着我必须向 Athena 发出 25,000 个请求,然后当数据返回时,我必须向我的 Redis 集群发出 25,000 个请求。

一次从节点到 Athena 的理想请求量是多少?

我问的原因是因为我试图通过创建一个包含 25,000 个承诺的数组然后对其调用 Promise.all(promiseArray) 来做到这一点,但该应用程序永远挂起。

所以我决定改为一次触发 1 并使用递归将第一个索引拼接出来,然后在承诺已解决后将剩余的记录传递给调用函数。

问题在于它需要很长时间。我休息了大约一个小时回来,还剩下 23,000 条记录。

我尝试 google Node 和 Athena 一次可以处理多少请求,但我一无所获。我希望有人可能对此有所了解并能够与我分享。

谢谢。

这是我的代码,仅供参考:

作为旁注,我想做的不同的是,我可以一次发送 4、5、6、7 或 8 个请求,而不是一次发送一个请求,具体取决于它的执行速度。

此外,Node 集群将如何影响这样的性能?

exports.storeDomainTrends = () => {
return new Promise((resolve, reject)=>{
    athenaClient.execute(`SELECT DISTINCT the_column from "the_db"."the_table"`,
    (err, data) =>  {
        var getAndStoreDomainData = (records) => {
            if(records.length){
                return new promise((resolve, reject) => {
                    var subrecords = records.splice(0, )[0]
                    athenaClient.execute(`
                    SELECT 
                    field,
                    field,
                    field,
                    SUM(field) as field
                    FROM "the_db"."the_table"
                    WHERE the_field IN ('Month') AND the_field = '`+ record.domain_name +`'
                    GROUP BY the_field, the_field, the_field
                    `, (err, domainTrend) => {

                        if(err) {
                            console.log(err)
                            reject(err)
                        }

                        redisClient.set(('Some String' + domainTrend[0].domain_name), JSON.stringify(domainTrend))
                        resolve(domainTrend);
                    })
                })
                .then(res => {
                    getAndStoreDomainData(records);
                })
            }
        }

        getAndStoreDomainData(data);

    })
})

}

使用 lib 您的代码可能如下所示:

const Fail = function(reason){this.reason=reason;};
const isFail = x=>(x&&x.constructor)===Fail;
const distinctDomains = () =>
  new Promise(
    (resolve,reject)=>
      athenaClient.execute(
        `SELECT DISTINCT domain_name from "endpoint_dm"."bd_mb3_global_endpoints"`,
        (err,data)=>
          (err)
            ? reject(err)
            : resolve(data)
      )
  );
const domainDetails = domain_name =>
  new Promise(
    (resolve,reject)=>
      athenaClient.execute(
        `SELECT 
        timeframe_end_date,
        agg_type,
        domain_name,
        SUM(endpoint_count) as endpoint_count
        FROM "endpoint_dm"."bd_mb3_global_endpoints"
        WHERE agg_type IN ('Month') AND domain_name = '${domain_name}'
        GROUP BY timeframe_end_date, agg_type, domain_name`,
        (err, domainTrend) =>
            (err)
              ? reject(err)
              : resolve(domainTrend)
        )
  );
const redisSet = keyValue =>
  new Promise(
    (resolve,reject)=>
      redisClient.set(
        keyValue,
        (err,res)=>
          (err)
            ? reject(err)
            : resolve(res)
      )
  );
const process = batchSize => limitFn => resolveValue => domains => 
  Promise.all(
    domains.slice(0,batchSize)
    .map(//map domains to promises
      domain=>
        //maximum 5 active connections
        limitFn(domainName=>domainDetails(domainName))(domain.domain_name)
        .then(
          domainTrend=>
            //the redis client documentation makes no sense whatsoever
            //https://redis.io/commands/set
            //no mention of a callback
            //https://github.com/NodeRedis/node_redis
            //mentions a callback, since we need the return value
            //and best to do it async we will use callback to promise
            redisSet([
              `Endpoint Profiles - Checkin Trend by Domain - Monthly - ${domainTrend[0].domain_name}`,
              JSON.stringify(domainTrend)
            ])
        )
        .then(
          redisReply=>{
            //here is where things get unpredictable, set is documented as 
            //  a synchronous function returning "OK" or a function that
            //  takes a callback but no mention of what that callback recieves
            //  as response, you should try with one or two records to
            //  finish this on reverse engineering because documentation
            //  fails 100% here and can not be relied uppon.
            console.log("bad documentation of redis client... reply is:",redisReply);
            (redisReply==="OK")
              ? domain
              : Promise.reject(`Redis reply not OK:${redisReply}`)
          }
        )
        .catch(//catch failed, save error and domain of failed item
          e=>
            new Fail([e,domain])
        )
    )
  ).then(
    results=>{
      console.log(`got ${batchSize} results`);
      const left = domains.slice(batchSize);
      if(left.length===0){//nothing left
        return resolveValue.conat(results);
      }
      //recursively call process untill done
      return process(batchSize)(limitFn)(resolveValue.concat(results))(left)
    }
  );
const max5 = lib.throttle(5);//max 5 active connections to athena
distinctDomains()//you may want to limit the results to 50 for testing
//you may want to limit batch size to 10 for testing
.then(process(1000)(max5)([]))//we have 25000 domains here
.then(
  results=>{//have 25000 results
    const successes = results.filter(x=>!isFail(x));
    //array of failed items, a failed item has a .reason property
    //  that is an array of 2 items: [the error, domain]
    const failed = results.filter(isFail);
  }
)

你应该弄清楚 redis 客户端是做什么的,我试着用文档弄清楚,但也可以问问我的金鱼。对客户端行为进行逆向工程后,最好尝试使用小批量来查看是否有任何错误。你必须导入 lib 才能使用它,你可以找到它 here.

我能够按照 Kevin B 所说的找到查询数据的更快方法。我所做的是更改查询,以便我可以从 Athena 获取所有域的趋势。我按 domain_name 订购了它,然后将其作为节点流发送,这样我就可以在数据传入时将每个域名分离到它自己的 JSON 中。

无论如何,这就是我的结局。

exports.storeDomainTrends = () => {
return new Promise((resolve, reject)=>{
    var streamObj = athenaClient.execute(`
    SELECT field,
            field,
            field,
            SUM(field) AS field
    FROM "db"."table"
    WHERE field IN ('Month')
    GROUP BY  field, field, field
    ORDER BY  field desc`).toStream();

    var data = [];

    streamObj.on('data', (record)=>{
        if (!data.length || record.field === data[0].field){
            data.push(record)
        } else if (data[0].field !== record.field){
            redisClient.set(('Key'), JSON.stringify(data))
            data = [record]
        }
    })

    streamObj.on('end', resolve);

    streamObj.on('error', reject);

})
.then()

}