如何测试多个 Streams 是否按特定顺序发出?

How to test whether multiple Streams emit in a specific order?

我想验证多个流是否按特定顺序发出元素。

让我们采用以下 2 个流。

Stream<String> data;
Stream<SomeStatus> status;

emitsInOrder 可能会执行以下操作:

expect(data, emitsInOrder(["a", "b", "c"]));
expect(status, emitsInOrder([SomeStatus.loading, SomeStatus.connected, SomeStatus.disconnected]));

但是我怎样才能轻松地测试这样的东西:

Map map = {"data": data, "status": status};
expect(map, emitsInOrder([
   {"status": SomeStatus.loading},
   {"status": SomeStatus.connected},
   {"data": "a"},
   ...  // you get the idea
]);

谢谢

编辑:我已尝试解决此问题作为答案,并希望收到任何反馈。

这是我想出的:

import 'dart:async';
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

class StreamKey<T> {

    final String key;

    final Stream<T> stream;

    const StreamKey(this.key, this.stream);

    StreamEventType emitted(T t) => StreamEmitted(key, t);

    StreamEventType done() => StreamDone(key);

    StreamEventType errored(dynamic error) => StreamErrored(key, error);
}

void expectStreamsInOrder(List<StreamKey> keys, dynamic matcher) {
    Map<String, StreamSubscription> subs = {};

    expect(keys.map((a) => a.key).toSet().length, keys.length, reason: "Keys need to be unique");

    PublishSubject<StreamEventType> s = PublishSubject();

    expect(s, matcher);

    void checkEmpty() {
        assert(!s.isClosed);
        if(subs.values.isEmpty) {
            s.close();
        }
    }

    keys.forEach((key) {
        subs[key.key] = key.stream.listen(
                (data) {
                s.add(key.emitted(data));
            },
            onError: (error) {
                s.add(key.errored(error));
                subs.remove(key.key).cancel();
                checkEmpty();
            },
            onDone: () {
                s.add(key.done());
                subs.remove(key.key).cancel();
                checkEmpty();
            },
        );
    });
}

abstract class StreamEventType {
    final String key;

    const StreamEventType(this.key);
}

class StreamEmitted<T> extends StreamEventType {

    final T data;

    StreamEmitted(String key, this.data) : super(key);

    @override
    bool operator ==(Object other) =>
        identical(this, other) ||
            other is StreamEmitted &&
                runtimeType == other.runtimeType &&
                key == other.key &&
                data == other.data;
    @override
    int get hashCode => data.hashCode ^ key.hashCode;

    @override
    String toString() => 'StreamEmitted{key: $key, data: $data}';
}

class StreamDone extends StreamEventType {

    StreamDone(String key) : super(key);

    @override
    bool operator ==(Object other) =>
        identical(this, other) ||
            other is StreamDone &&
                key == other.key &&
                runtimeType == other.runtimeType;

    @override
    int get hashCode => key.hashCode;

    @override
    String toString() => 'StreamDone{key: $key}';
}

class StreamErrored extends StreamEventType {

    final dynamic error;

    StreamErrored(String key, this.error) : super(key);

    @override
    bool operator ==(Object other) =>
        identical(this, other) ||
            other is StreamErrored &&
                runtimeType == other.runtimeType &&
                key == other.key &&
                error == other.error;

    @override
    int get hashCode => error.hashCode ^ key.hashCode;

    @override
    String toString() => 'StreamErrored{key: $key, error: $error}';
}

import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

import 'streams_in_order.dart';

void main() {
    group("$expectStreamsInOrder", () {
        Subject<String> data; // ignore: close_sinks
        Subject<SomeStatus> status; // ignore: close_sinks
        StreamKey<String> keyData;
        StreamKey<SomeStatus> keyStatus;

        setUp(() {
            data = PublishSubject(sync: true);
            status = PublishSubject(sync: true);
            keyData = StreamKey("data", data);
            keyStatus = StreamKey("status", status);
        });

        test("example", () async {
            expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
                keyStatus.emitted(SomeStatus.loading),
                keyStatus.emitted(SomeStatus.connected),
                keyData.emitted("a"),
                keyStatus.emitted(SomeStatus.disconnected),
                keyData.done(),
                keyStatus.done(),
                emitsDone,
            ]));

            status.add(SomeStatus.loading);
            status.add(SomeStatus.connected);
            data.add("a");
            status.add(SomeStatus.disconnected);
            await data.close();
            await status.close();
        });


        test("example with error", () async {
            expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
                keyStatus.emitted(SomeStatus.loading),
                keyStatus.emitted(SomeStatus.connected),
                keyData.emitted("a"),
                keyStatus.emitted(SomeStatus.disconnected),
                keyData.errored("err1"),
                keyStatus.errored("err2"),
                emitsDone,
            ]));

            status.add(SomeStatus.loading);
            status.add(SomeStatus.connected);
            data.add("a");
            status.add(SomeStatus.disconnected);
            data.addError("err1");
            status.addError("err2");
        });

        test("-", () async {
            expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
                keyData.emitted("aa"),
                keyStatus.emitted(SomeStatus.loading),
                keyData.errored(123),
                keyStatus.done(),
                emitsDone,
            ]));

            data.add("aa");
            status.add(SomeStatus.loading);
            data.addError(123);
            await status.close();
        });

        test("-", () async {
            expectStreamsInOrder([keyData, keyStatus], emitsInOrder([
                keyData.emitted("aa"),
                keyStatus.emitted(SomeStatus.loading),
                keyStatus.done(),
                keyData.errored(123),
                emitsDone,
            ]));

            data.add("aa");
            status.add(SomeStatus.loading);
            await status.close();
            data.addError(123);
        });
    });
}

enum SomeStatus {
    loading,
    connected,
    disconnected,
}