如何打破飞镖中流的循环?

How to break the loop for a stream in dart?

我知道 StreamSubscription 可以中止监听。但出于某种原因,我无法为 File.openRead() 调用监听。如何中止读取操作流?

import 'dart:io';
import 'dart:async';

class Reader {
  Stream<int> progess(File file) async* {
    var sum = 0;
    var fs = file.openRead();
    await for (var d in fs) {
      // consume d
      sum += d.length;
      yield sum;
    }
  }

  void cancel() {
    // How to abort the above loop without using StreamSubscription returned by listen().
  }
}


void main() async {
  var reader = Reader();
  var file = File.new("a.txt");
  reader.progess(file).listen((p) => print("$p"));
  // How to cancel it without 
  Future.delayed(Duration(seconds: 1), () { reader.cancel()});
}

如果不在流订阅上调用 cancel,则无法取消流订阅。

您也许可以通过其他方式中断流生成器,使用“侧通道”要求它停止生成值。那不是流取消,更像是过早的流关闭

示例:

class Reader {
  bool _cancelled = false;
  Stream<int> progess(File file) async* {
    var sum = 0;
    var fs = file.openRead();
    await for (var d in fs) {
      // consume d
      sum += d.length;
      if (_cancelled) return; // <---
      yield sum;
    }
  }

  void cancel() {
    _cancelled = true;
  }
}

另一种选择是创建一个可以中断流的通用流包装器。也许像

import"dart:async";
class CancelableStream<T> extends Stream<T> {
  final Stream<T> _source;
  final Set<_CancelableStreamSubscription<T>> _subscriptions = {};
  CancelableStream(Stream<T> source) : _source = source;
  @override
  StreamSubscription<T> listen(
       onData, {onError, onDone, cancelOnError}) {
    var sub = _source.listen(onData, 
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
    var canSub = _CancelableStreamSubscription<T>(sub, this, cancelOnError ?? false);
    _subscriptions.add(canSub);
    return canSub;
  }
  void cancelAll() {
    while (_subscriptions.isNotEmpty) {
      _subscriptions.first.cancel();
    }
  }
}

class _CancelableStreamSubscription<T> implements StreamSubscription<T> {
  final bool _cancelOnError;
  final StreamSubscription<T> _source;
  final CancelableStream<T> _owner;
  _CancelableStreamSubscription(
      this._source, this._owner, this._cancelOnError);
  @override
  Future<void> cancel() {
    _owner._subscriptions.remove(this);
    return _source.cancel();
  }
  @override
  void onData(f) => _source.onData(f);
  @override
  void onError(f) {
    if (!_cancelOnError) {
      _source.onError(f);
    } else {
      _source.onError((Object e, StackTrace s) {
        _owner._subscriptions.remove(this);
        if (f is void Function(Object, StackTrace)) {
          f(e, s);
        } else {
          f?.call(e);
        } 
      });
    }
  }
  @override
  bool get isPaused => _source.isPaused;
  @override
  void onDone(f) => _source.onDone(() {
     _owner._subscriptions.remove(this);
     f?.call();
  });
  @override
  void pause([resumeFuture]) => _source.pause(resumeFuture);
  @override
  void resume() => _source.resume;
  @override
  Future<E> asFuture<E>([E? value]) => _source.asFuture(value);
}

然后您可以像这样使用它:

void main() async {
  Stream<int> foo() async* {
    yield 1;
    yield 2;
    yield 3;
    yield 4;
  }
  var s = CancelableStream<int>(foo());
  await for (var x in s) {
    print(x);
    if (x == 2) s.cancelAll();
  }
}