如何将多个流合并为更高级别的流?
How can I merge multiple Streams into a higher level Stream?
我有两个流,Stream<A>
和 Stream<B>
。我有一个类型 C
的构造函数,它接受一个 A
和一个 B
。如何将两个 Stream
合并为一个 Stream<C>
?
import 'dart:async' show Stream;
import 'package:async/async.dart' show StreamGroup;
main() async {
var s1 = stream(10);
var s2 = stream(20);
var s3 = StreamGroup.merge([s1, s2]);
await for(int val in s3) {
print(val);
}
}
Stream<int> stream(int min) async* {
int i = min;
while(i < min + 10) {
yield i++;
}
}
另见 http://news.dartlang.org/2016/03/unboxing-packages-async-part-2.html
打印
10
20
11
21
12
22
13
23
14
24
15
25
16
26
17
27
18
28
19
29
您可以在 package:async
中使用 StreamZip
将两个流组合成一对流,然后从中创建 C
对象。
import "package:async" show StreamZip;
...
Stream<C> createCs(Stream<A> as, Stream<B> bs) =>
new StreamZip([as, bs]).map((ab) => new C(ab[0], ab[1]));
如果您需要在 Stream<A>
或 Stream<B>
发出事件时做出反应并使用两个流中的最新值,请使用 combineLatest
.
Stream<C> merge(Stream<A> streamA, Stream<B> streamB) {
return streamA
.combineLatest(streamB, (a, b) => new C(a, b));
}
对于需要合并两个以上 不同类型 流并在每次更新任何流时获取所有最新值的人。
import 'package:stream_transform/stream_transform.dart';
Stream<List> combineLatest(Iterable<Stream> streams) {
final Stream<Object> first = streams.first.cast<Object>();
final List<Stream<Object>> others = [...streams.skip(1)];
return first.combineLatestAll(others);
}
组合流将产生:
streamA: a----b------------------c--------d---|
streamB: --1---------2-----------------|
streamC: -------&----------%---|
combined: -------b1&--b2&---b2%---c2%------d2%-|
为什么不 StreamZip
?因为 StreamZip
会产生:
streamA: a----b------------------c--------d---|
streamB: --1---------2-----------------|
streamC: -------&----------%---|
combined: -------a1&-------b2%--|
用法:
Stream<T> sA;
Stream<K> sB;
Stream<Y> sC;
combineLatest([sA, sB, sC]).map((data) {
T resA = data[0];
K resB = data[1];
Y resC = data[2];
return D(resA, resB, resC);
});
要在第二个流从第一个流获取结果时合并两个流,请使用 asyncExpand
Stream<UserModel?> getCurrentUserModelStream() {
return FirebaseAuth.instance.authStateChanges().asyncExpand<UserModel?>(
(currentUser) {
if (currentUser == null) {
return Stream.value(null);
}
return FirebaseFirestore.instance
.collection('users')
.doc(currentUser.uid)
.snapshots()
.map((doc) {
final userData = doc.data();
if (userData == null) {
return null;
}
return UserModel.fromJson(userData);
});
},
);
}
我有两个流,Stream<A>
和 Stream<B>
。我有一个类型 C
的构造函数,它接受一个 A
和一个 B
。如何将两个 Stream
合并为一个 Stream<C>
?
import 'dart:async' show Stream;
import 'package:async/async.dart' show StreamGroup;
main() async {
var s1 = stream(10);
var s2 = stream(20);
var s3 = StreamGroup.merge([s1, s2]);
await for(int val in s3) {
print(val);
}
}
Stream<int> stream(int min) async* {
int i = min;
while(i < min + 10) {
yield i++;
}
}
另见 http://news.dartlang.org/2016/03/unboxing-packages-async-part-2.html
打印
10
20
11
21
12
22
13
23
14
24
15
25
16
26
17
27
18
28
19
29
您可以在 package:async
中使用 StreamZip
将两个流组合成一对流,然后从中创建 C
对象。
import "package:async" show StreamZip;
...
Stream<C> createCs(Stream<A> as, Stream<B> bs) =>
new StreamZip([as, bs]).map((ab) => new C(ab[0], ab[1]));
如果您需要在 Stream<A>
或 Stream<B>
发出事件时做出反应并使用两个流中的最新值,请使用 combineLatest
.
Stream<C> merge(Stream<A> streamA, Stream<B> streamB) {
return streamA
.combineLatest(streamB, (a, b) => new C(a, b));
}
对于需要合并两个以上 不同类型 流并在每次更新任何流时获取所有最新值的人。
import 'package:stream_transform/stream_transform.dart';
Stream<List> combineLatest(Iterable<Stream> streams) {
final Stream<Object> first = streams.first.cast<Object>();
final List<Stream<Object>> others = [...streams.skip(1)];
return first.combineLatestAll(others);
}
组合流将产生:
streamA: a----b------------------c--------d---|
streamB: --1---------2-----------------|
streamC: -------&----------%---|
combined: -------b1&--b2&---b2%---c2%------d2%-|
为什么不 StreamZip
?因为 StreamZip
会产生:
streamA: a----b------------------c--------d---|
streamB: --1---------2-----------------|
streamC: -------&----------%---|
combined: -------a1&-------b2%--|
用法:
Stream<T> sA;
Stream<K> sB;
Stream<Y> sC;
combineLatest([sA, sB, sC]).map((data) {
T resA = data[0];
K resB = data[1];
Y resC = data[2];
return D(resA, resB, resC);
});
要在第二个流从第一个流获取结果时合并两个流,请使用 asyncExpand
Stream<UserModel?> getCurrentUserModelStream() {
return FirebaseAuth.instance.authStateChanges().asyncExpand<UserModel?>(
(currentUser) {
if (currentUser == null) {
return Stream.value(null);
}
return FirebaseFirestore.instance
.collection('users')
.doc(currentUser.uid)
.snapshots()
.map((doc) {
final userData = doc.data();
if (userData == null) {
return null;
}
return UserModel.fromJson(userData);
});
},
);
}