暂停异步函数,直到满足 dart 中的条件

Pause an async function until a condition is met in dart

我有一个小部件可以执行一系列昂贵的计算(在计算核心上执行)。计算在小部件的列表中排队,等待每个计算。计算在构建函数中排队,因此在发送到计算核心之前需要在主隔离上执行一些计算,我认为不要直接在构建函数中处理它们,而是在构建时将它们添加到队列中.

这是我目前要用完队列的函数:

void emptyQueue() async {

  while(queue.isNotEmpty) {
    /* Perform some computations here, which prevents me from just sending the data to the compute core directly (they need data that is only in the main isolate and it's too big to send to the compute core) */

    /* Send the data to the worker isolate */
    await Future.delayed(Duration(milliseconds: 1000));

    /* Remove the first element from the queue. */
    queue.removeFirst();
  }
}

当函数被调用时,这个函数运行良好,因为队列是固定的,之后不再向它添加元素,因为它只是循环直到队列为空。我的问题是我可以在任何时间点向队列添加请求,即使队列为空,到那时功能将完全完成,因此队列不会再耗尽。

我认为这样的事情可能会奏效,但到目前为止我无法具体实施它。

void emptyQueue() async {
    while (true) {
      if (queue.isNotEmpty()) {
        /* Perform some computations here (gather data to send to the compute core) */

        /* Send the data to the worker isolate */
        await Future.delayed(Duration(milliseconds: 1000));

        /* Remove the first element from the queue. */
        queue.removeFirst();
      }

      await /* a signal of some sort */;
    }
  }

这样,我可以在 initState 中启动 emptyQueue,因为它会暂停,直到有内容添加到队列中。此外,在构建时调用的函数只是 queue.add(item),没有任何主要的隔离端计算。

有什么方法可以在最后(最后 await)暂停该功能?

Streams 最适合这种场景。考虑以下..

import 'dart:async';

// creating a stream
StreamController<String> streamController = StreamController();

void main(){
  
  // stream listener
  streamController.stream.listen((value){
    print(value); // print message to terminal
  });
  
  // adding object to stream
  streamController.add("hello");
  streamController.add("bye");
  streamController.add("seeya");
}

在这里,每当您将对象(在本例中为字符串)添加到流中时,都会调用侦听器来执行您在其中定义的任何内容。在我的例子中,代码将打印 hello byeseeya 到终端。

同样,无论何时向流中添加内容,都可以使用流来启动一些代码。

我能够通过等待 Completer.

来暂停 emptyQueue
void emptyQueue() async {
    while (true) {
      if (queue.isNotEmpty()) {
        /* Perform some computations here (gather data to send to the compute core) */

        /* Send the data to the worker isolate */
        await Future.delayed(Duration(milliseconds: 1000));

        /* Remove the first element from the queue. */
        queue.removeFirst();
      }

      /* Waiting for queue items */
      if (!_queueFreezerCompleter.isCompleted) _queueFreezerCompleter.complete();
      _queueFreezerCompleter = Completer();

      await _queueFreezerCompleter.future;
    }
  }

完成者只是 State class 的一个属性。

Completer _queueFreezerCompleter = Completer();

这个 Completer 在新项目被添加到队列时完成(仅当它尚未完成时),这意味着外部循环 while(true) 将重新启动并且队列将被清空再次.

void onAddQueue(){
   queue.enqueue(value);
   if (!_queueFreezerCompleter.isCompleted) _queueFreezerCompleter.complete();
}

使用此解决方案还可以从队列中删除项目(如果它们尚未被处理)