如何限制 return 输出的打字稿功能

How to throttle typescript functions that return output

我正在使用打字稿编写 node.js 应用程序。我的应用程序将有多个服务相互通信。一些服务需要调用外部 API。这个API对每秒可以执行的调用次数有限制。因此,我想创建一个包装外部 API 调用的服务(我们称之为 ApiService)。其他服务将调用此服务,它将在队列中收集它们的请求并按顺序执行它们 - 每秒 N 个请求(为简单起见,我们假设每秒 1 个)。当服务 A 调用 ApiService 的方法时 - 它期望接收输出(接收 Promise 是可以的)。

现在我的问题是 - 如何在 ApiService 中对这些 API 调用进行排队,以便每 1 秒执行队列中的下一个调用以及 return 该 API 调用 ApiService 的调用者?

这是一个示例服务:

export class ServiceA {
   apiService: ApiService;

   public constructor(_apiService: ApiService) {
      apiService = _apiService;
   }

   public async DoWork() {
      // Do some stuff
      const output: number = await apiService.RetrieveA(param1, param2);
      // Do something with the output
   }
}

API 服务:

export class ApiService {
   queue: (() => Promise<any>)[] = [];

   public async RetrieveA(param1, param2): Promise<number> {
      const func = async () => {
         return this.CallApi(param1, param2);
      };

      this.queue.push(func);
      return func();
   }

   public async RunQueue() {
      while(true) {
         const func = this.queue.shift();
         if (!func) { continue; }
         // Call the function after 1 second
         await setTimeout(() => { func(); }, 1000);
      }
   }

   private async CallApi(param1, param2): Promise<number> {
      // Call the external API, process its output and return
   }
}

统筹整个事情的主要方法:

var CronJob = require('cron').CronJob;

const apiService = new ApiService();
const service = new ServiceA(apiService);

new CronJob('* * * * * *', function() {
   service.DoWork();
}, null, true);

apiService.RunQueue();

我面临的问题是,当 RetrieveA 方法 returns func() - 函数被执行。我需要 return 一个 Promise,但实际的函数执行需要在 RunQueue() 方法中进行。有没有办法做到这一点?我可以 return 一个承诺而不立即执行函数并等待这个承诺 - 在 RunQueue 方法中调用函数时接收输出吗?

或者是否有其他方法可以解决 API 调用 return 输出的节流问题?

我是 Node.js/Typescript/JavaScript 世界的新手,非常感谢您的帮助:)

我确实设法找到了可行的解决方案。我对 JavaScript 中的整个 Promise 和异步概念不是很熟悉,所以这可能不是最好的解决方案,但它可以满足我的具体情况。以下是希望实现类似功能的其他人的代码:

示例ServiceA与上面相同:

export class ServiceA {
   apiService: ApiService;

   public constructor(_apiService: ApiService) {
      apiService = _apiService;
   }

   public async DoWork() {
      // Do some stuff
      const output: number = await apiService.RetrieveA(param1, param2);
      // Do something with the output
   }
}

这是 returns 承诺输出并限制实际函数执行的修改后的 ApiService:

export class ApiService {
   // We keep the functions that need to be executed in this queue
   // and process them sequentially
   queue: (() => void)[] = [];

   public async RetrieveA(param1, param2): Promise<number> {
      // This resolver serves two purposes - it will be called when the
      // function is executed (to set the output), but will also be part
      // of the Promise that will be returned to the caller (so that the
      // caller can await for the result).
      let resolver: (value: number) => void;

      // This function will be executed by the RunQueue method when its
      // turn has come. It makes a call to the external API and when that
      // call succeeds - the resolver is called to return the result through
      // the Promise.
      const func = async () => {
         return this.CallApi(param1, param2).then(resolver);
      };

      this.queue.push(func);

      // This is the promise that we return to the caller, so that he
      // can await for the result. 
      const promise = new Promise<number>((resolve, reject) => {
         resolver = resolve;
      });

      return promise;
   }

   public async Run() {
      this.RunQueue(this.queue);
   }

   private async RunQueue(funcQueue: (() => void)[]) {
      // Get the first element of the queue
      const func = funcQueue.shift();

      // If the queue is empty - this method will continue to run
      // until a new element is added to the queue
      if (func) {
         await func();
      }

      // Recursively call the function again after 1 second
      // This will process the next element in the queue
      setTimeout(() => {
         this.RunQueue(funcQueue);
      }, 1000);
   }

   private async CallApi(param1, param2): Promise<number> {
      // Call the external API, process its output and return
   }
}

我希望代码中的注释能够清楚地说明我正在努力实现什么(以及如何实现)。

如果您想将对 RetreiveA 的调用限制为每秒 2 次,所有这些都可以简单得多:

//lib is here: https://github.com/amsterdamharu/lib/blob/master/src/index.js
import * as lib from '../../src/index'
const twoPerSecond = lib.throttlePeriod(2,1000);
export class ApiService {
  public RetrieveA(param1, param2): Promise<number> {
    //removed the resolver part, according to the typescript signature
    //  it should return a promise of number but resolver actually takes
    //  that number and returns void (undefined?)
    return twoPerSecond(this.CallApi.bind(this))([param1, param2]);
  }
  //change the signature of this function to take one parameter
  //  but deconsruct the array to param1 and param2
  private async CallApi([param1, param2]): Promise<number> {
    // Call the external API, process its output and return
  }
}

你的方法只有在这个 class 只有一个实例时才有效。如果您要创建多个实例并在这些实例上调用 RetrieveA,则您不再将请求限制为 callApi.