对同一流的多次访问

Multiple Access to the Same Stream

上下文

下面的代码表示一个抽象,其中 MyClass 是某种下载管理器。

import 'dart:async';

Future<void> main() async {
  MyClass().test().listen((v) => print('t1: $v')).onError(print);
  final commomClass = MyClass();
  commomClass.test().listen((v) => print('t2: $v')).onError(print);
  commomClass.test().listen((v) => print('t3: $v')).onError(print);
}

class MyClass {
  bool _isDownloadInProgress = false;
  int _i = 0;
  StreamController<int> _sc;
  
  Stream<int> test() async* {
    if (_isDownloadInProgress) {
      throw Exception('Download already in progress');
    } else {
      _sc = StreamController<int>();
    }
    
    Timer.periodic(
      const Duration(seconds: 1),
      (t) {
        if (_i == 4) {
          _isDownloadInProgress = false;
          _i = 0;
          _sc.close();
          t.cancel();
        } else {
          _sc.add(_i++);
        }
      },
    );

    yield* _sc.stream;
  }
}

问题

我预计在执行这段代码后,它会生成值 t1 和 t2,而输出 t3 只会生​​成一次 'Download already in progress'。例如:

t1: 0
t2: 0
t3: Download already in progress
t1: 1
t2: 1
t1: 2
t2: 2
t1: 3
t2: 3

但它输出所有 4 t1 值,8 t3 值和 没有 'Download already in progress' 留言:

t1: 0
t3: 0
t3: 1
t1: 1
t3: 2
t3: 3
t1: 2
t3: 0
t1: 3
t3: 1
t3: 2
t3: 3

对我来说,t1 值会正确输出,t2 也会正确输出,t3 会输出 'Download already in progress' 消息,因为一切都是作为 运行 异步,它将尝试 'download' 已经下载的内容(因为 test() 方法是在 MyClass 的同一实例上调用的)。

我错过了什么?

您忘记在函数内部设置 _isDownloadInProgress = true;

尝试:

if (_isDownloadInProgress) {
    throw Exception('Download already in progress');
} else {
  _sc = StreamController<int>();
  _isDownloadInProgress = true;
}

输出

Exception: Download already in progress
t1: 0
t2: 0
t1: 1
t2: 1
t1: 2
t2: 2
t1: 3
t2: 3

您可以通过延迟开始让 t3 更有趣:

Future.delayed(const Duration(seconds: 2),
      () => commomClass.test().listen((v) => print('t3: $v')).onError(print));

输出

t1: 0
t2: 0
Exception: Download already in progress
t1: 1
t2: 1
t1: 2
t2: 2
t1: 3
t2: 3

对于初学者,您的代码永远不会将 _isDownloadInProgress 设置为 true,因此没有理由显示“正在下载”。

这实际上是导致第二个错误的原因。当您调用 t3 listen 时,由于 _isDownloadInProgress 始终为 false,这会导致 _sc 除了新的 Timer.periodic 入队外还被覆盖。当每个计时器触发时,它引用 _sc,现在是包含 t3 listen 的计时器,因此您最终会得到两个计时器将事件推送到同一个流控制器,这就是为什么您会看到双倍的 t3事件。

只需在实例化计时器之前设置 _isDownloadInProgress = true 就足以获得预期结果:

class MyClass {
  bool _isDownloadInProgress = false;
  int _i = 0;
  StreamController<int> _sc;
  
  Stream<int> test() async* {
    if (_isDownloadInProgress) {
      throw Exception('Download already in progress');
    } else {
      _sc = StreamController<int>();
    }
    
    _isDownloadInProgress = true; // Add this line
    Timer.periodic(
      const Duration(seconds: 1),
      (t) {
        if (_i == 4) {
          _isDownloadInProgress = false;
          _i = 0;
          _sc.close();
          t.cancel();
        } else {
          _sc.add(_i++);
        }
      },
    );

    yield* _sc.stream;
  }
}

结果:

Exception: Download already in progress
t1: 0
t2: 0
t1: 1
t2: 1
t1: 2
t2: 2
t1: 3
t2: 3