如何在 Dart 中创建 StreamTransformer?
How to create a StreamTransformer in Dart?
尝试构建自定义 StreamTransformer class,但是很多示例似乎已经过时,而在文档中找到的不是(某些类型化语言可能会考虑的内容) ) 作为 class(在此处找到:https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:async.StreamTransformer)。这看起来不像是一种非常类似于 Dart 的处理方式,而更像是一种类似 Javascript 的方式(我使用 Dart 来避免这种方式)。
许多在线资源说这是创建 StreamTransformer 的方式,但是在扩展它时出现错误。
class exampleStreamTransformer extends StreamTransformer
{
//... (This won't work)
}
'Implements' 似乎是要走的路,同时实现所需的绑定功能:
class exampleStreamTransformer implements StreamTransformer
{
Stream bind(Stream stream)
{
//... (Go on to return new stream, etc)
}
}
我似乎找不到任何这样的例子,但我自己把一些东西放在一起(这在我的 IDE 中被接受,但在运行时不被接受,当它尝试时我得到一个空对象错误使用暂停吸气剂):
class exampleStreamTransformer implements StreamTransformer
{
StreamController<String> _controller;
StreamSubscription<String> _subscription;
Stream bind(Stream stream)
{
_controller = new StreamController<String>(
onListen: ()
{
_subscription = stream.listen((data)
{
// Transform the data.
_controller.add(data);
},
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: true); // Unsure how I'd pass this in?????
},
onPause: _subscription.pause,
onResume: _subscription.resume,
onCancel: _subscription.cancel,
sync: true
);
return _controller.stream;
}
}
想以这种方式实现它,就像 'typed' 生成 class 的方式一样,非常感谢任何帮助,谢谢。
你为什么不用StreamTransformer.fromHandler()
:
import 'dart:async';
void handleData(data, EventSink sink) {
sink.add(data*2);
}
void main() {
StreamTransformer doubleTransformer = new StreamTransformer.fromHandlers(handleData: handleData);
StreamController controller = new StreamController();
controller.stream.transform(doubleTransformer).listen((data) {
print('data: $data');
});
controller.add(1);
controller.add(2);
controller.add(3);
}
输出:
data: 2
data: 4
data: 6
好的。这是另一个工作示例:
import 'dart:async';
class DuplicateTransformer<S, T> implements StreamTransformer<S, T> {
StreamController _controller;
StreamSubscription _subscription;
bool cancelOnError;
// Original Stream
Stream<S> _stream;
DuplicateTransformer({bool sync: false, this.cancelOnError}) {
_controller = new StreamController<T>(onListen: _onListen, onCancel: _onCancel, onPause: () {
_subscription.pause();
}, onResume: () {
_subscription.resume();
}, sync: sync);
}
DuplicateTransformer.broadcast({bool sync: false, bool this.cancelOnError}) {
_controller = new StreamController<T>.broadcast(onListen: _onListen, onCancel: _onCancel, sync: sync);
}
void _onListen() {
_subscription = _stream.listen(onData,
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: cancelOnError);
}
void _onCancel() {
_subscription.cancel();
_subscription = null;
}
/**
* Transformation
*/
void onData(S data) {
_controller.add(data);
_controller.add(data); /* DUPLICATE EXAMPLE!! REMOVE FOR YOUR OWN IMPLEMENTATION!! */
}
/**
* Bind
*/
Stream<T> bind(Stream<S> stream) {
this._stream = stream;
return _controller.stream;
}
}
void main() {
// Create StreamController
StreamController controller = new StreamController.broadcast();
// Transform
Stream s = controller.stream.transform(new DuplicateTransformer.broadcast());
s.listen((data) {
print('data: $data');
}).cancel();
s.listen((data) {
print('data2: $data');
}).cancel();
s.listen((data) {
print('data3: $data');
});
// Simulate data
controller.add(1);
controller.add(2);
controller.add(3);
}
让我添加一些注释:
- 在查看其他 dart 内部转换器的源代码时,使用
implements
似乎是正确的方法。
- 我为常规流和广播流实现了两个版本。
- 如果是常规流,您可以直接在新的流控制器上调用 cancel/pause/resumt,因为我们只能收听一次。
- 如果您使用广播流,我发现只有在没有人收听该流时才会调用 listen()。 onCancel 的行为相同。如果最后一个订阅者取消订阅,则调用 onCancel。这就是为什么在这里使用相同的功能是安全的。
https://github.com/dart-lang/sdk/issues/27740#issuecomment-258073139
You can use StreamTransformer.fromHandlers to easily create
transformers that just convert input events to output events.
Example:
new StreamTransformer.fromHandlers(handleData: (String event, EventSink output) {
if (event.startsWith('data:')) {
output.add(JSON.decode(event.substring('data:'.length)));
} else if (event.isNotEmpty) {
output.addError('Unexpected data from CloudBit stream: "$event"');
}
});
如果您想使用这样的函数简单地转换值
int handleData(int data) {
return data * 2;
}
使用 Stream 的 map 方法
stream
.map(handleData)
.listen((data) {
print('data: $data');
});
完整示例:
import 'dart:async';
int handleData(int data) {
return data * 2;
}
void main() {
final controller = StreamController<int>();
controller.stream
.map(handleData)
.listen((data) {
print('data: $data');
});
controller.add(1);
controller.add(2);
controller.add(3);
}
参见dart.dev
上的more examples
与 map
不同,转换器功能更强大,允许您维护内部状态,并在需要时发出值。它可以实现map
做不到的事情,例如延迟、复制值、选择性地省略某些值等
本质上,该实现需要一个 bind
方法来提供基于传入的旧流的新流,以及一个 cast
方法在 [=] 期间帮助 type-checking 30=].
这是一个 over-simplified 示例,它实现了一个将整数值流转换为总和流的“TallyTransformer”。例如,如果到目前为止的输入流有 1, 1, 1, -2, 0, ...
,输出流应该是 1, 2, 3, 1, 1, ...
,即到此为止所有输入的总和。
用法示例:stream.transform(TallyTransformer())
class TallyTransformer implements StreamTransformer {
StreamController _controller = StreamController();
int _sum = 0; // sum of all values so far
@override
Stream bind(Stream stream) {
// start listening on input stream
stream.listen((value) {
_sum += value; // add the new value to sum
_controller.add(_sum); // emit current sum to our listener
});
// return an output stream for our listener
return _controller.stream;
}
@override
StreamTransformer<RS, RT> cast<RS, RT>() {
return StreamTransformer.castFrom(this);
}
}
此示例是 over-simplified(但仍然有效)并且不包括流暂停、恢复或取消等情况。如果您 运行 进入 "Stream has already been listened"
错误,请确保流正在广播。
尝试构建自定义 StreamTransformer class,但是很多示例似乎已经过时,而在文档中找到的不是(某些类型化语言可能会考虑的内容) ) 作为 class(在此处找到:https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:async.StreamTransformer)。这看起来不像是一种非常类似于 Dart 的处理方式,而更像是一种类似 Javascript 的方式(我使用 Dart 来避免这种方式)。
许多在线资源说这是创建 StreamTransformer 的方式,但是在扩展它时出现错误。
class exampleStreamTransformer extends StreamTransformer
{
//... (This won't work)
}
'Implements' 似乎是要走的路,同时实现所需的绑定功能:
class exampleStreamTransformer implements StreamTransformer
{
Stream bind(Stream stream)
{
//... (Go on to return new stream, etc)
}
}
我似乎找不到任何这样的例子,但我自己把一些东西放在一起(这在我的 IDE 中被接受,但在运行时不被接受,当它尝试时我得到一个空对象错误使用暂停吸气剂):
class exampleStreamTransformer implements StreamTransformer
{
StreamController<String> _controller;
StreamSubscription<String> _subscription;
Stream bind(Stream stream)
{
_controller = new StreamController<String>(
onListen: ()
{
_subscription = stream.listen((data)
{
// Transform the data.
_controller.add(data);
},
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: true); // Unsure how I'd pass this in?????
},
onPause: _subscription.pause,
onResume: _subscription.resume,
onCancel: _subscription.cancel,
sync: true
);
return _controller.stream;
}
}
想以这种方式实现它,就像 'typed' 生成 class 的方式一样,非常感谢任何帮助,谢谢。
你为什么不用StreamTransformer.fromHandler()
:
import 'dart:async';
void handleData(data, EventSink sink) {
sink.add(data*2);
}
void main() {
StreamTransformer doubleTransformer = new StreamTransformer.fromHandlers(handleData: handleData);
StreamController controller = new StreamController();
controller.stream.transform(doubleTransformer).listen((data) {
print('data: $data');
});
controller.add(1);
controller.add(2);
controller.add(3);
}
输出:
data: 2
data: 4
data: 6
好的。这是另一个工作示例:
import 'dart:async';
class DuplicateTransformer<S, T> implements StreamTransformer<S, T> {
StreamController _controller;
StreamSubscription _subscription;
bool cancelOnError;
// Original Stream
Stream<S> _stream;
DuplicateTransformer({bool sync: false, this.cancelOnError}) {
_controller = new StreamController<T>(onListen: _onListen, onCancel: _onCancel, onPause: () {
_subscription.pause();
}, onResume: () {
_subscription.resume();
}, sync: sync);
}
DuplicateTransformer.broadcast({bool sync: false, bool this.cancelOnError}) {
_controller = new StreamController<T>.broadcast(onListen: _onListen, onCancel: _onCancel, sync: sync);
}
void _onListen() {
_subscription = _stream.listen(onData,
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: cancelOnError);
}
void _onCancel() {
_subscription.cancel();
_subscription = null;
}
/**
* Transformation
*/
void onData(S data) {
_controller.add(data);
_controller.add(data); /* DUPLICATE EXAMPLE!! REMOVE FOR YOUR OWN IMPLEMENTATION!! */
}
/**
* Bind
*/
Stream<T> bind(Stream<S> stream) {
this._stream = stream;
return _controller.stream;
}
}
void main() {
// Create StreamController
StreamController controller = new StreamController.broadcast();
// Transform
Stream s = controller.stream.transform(new DuplicateTransformer.broadcast());
s.listen((data) {
print('data: $data');
}).cancel();
s.listen((data) {
print('data2: $data');
}).cancel();
s.listen((data) {
print('data3: $data');
});
// Simulate data
controller.add(1);
controller.add(2);
controller.add(3);
}
让我添加一些注释:
- 在查看其他 dart 内部转换器的源代码时,使用
implements
似乎是正确的方法。 - 我为常规流和广播流实现了两个版本。
- 如果是常规流,您可以直接在新的流控制器上调用 cancel/pause/resumt,因为我们只能收听一次。
- 如果您使用广播流,我发现只有在没有人收听该流时才会调用 listen()。 onCancel 的行为相同。如果最后一个订阅者取消订阅,则调用 onCancel。这就是为什么在这里使用相同的功能是安全的。
https://github.com/dart-lang/sdk/issues/27740#issuecomment-258073139
You can use StreamTransformer.fromHandlers to easily create transformers that just convert input events to output events.
Example:
new StreamTransformer.fromHandlers(handleData: (String event, EventSink output) { if (event.startsWith('data:')) { output.add(JSON.decode(event.substring('data:'.length))); } else if (event.isNotEmpty) { output.addError('Unexpected data from CloudBit stream: "$event"'); } });
如果您想使用这样的函数简单地转换值
int handleData(int data) {
return data * 2;
}
使用 Stream 的 map 方法
stream
.map(handleData)
.listen((data) {
print('data: $data');
});
完整示例:
import 'dart:async';
int handleData(int data) {
return data * 2;
}
void main() {
final controller = StreamController<int>();
controller.stream
.map(handleData)
.listen((data) {
print('data: $data');
});
controller.add(1);
controller.add(2);
controller.add(3);
}
参见dart.dev
上的more examples与 map
不同,转换器功能更强大,允许您维护内部状态,并在需要时发出值。它可以实现map
做不到的事情,例如延迟、复制值、选择性地省略某些值等
本质上,该实现需要一个 bind
方法来提供基于传入的旧流的新流,以及一个 cast
方法在 [=] 期间帮助 type-checking 30=].
这是一个 over-simplified 示例,它实现了一个将整数值流转换为总和流的“TallyTransformer”。例如,如果到目前为止的输入流有 1, 1, 1, -2, 0, ...
,输出流应该是 1, 2, 3, 1, 1, ...
,即到此为止所有输入的总和。
用法示例:stream.transform(TallyTransformer())
class TallyTransformer implements StreamTransformer {
StreamController _controller = StreamController();
int _sum = 0; // sum of all values so far
@override
Stream bind(Stream stream) {
// start listening on input stream
stream.listen((value) {
_sum += value; // add the new value to sum
_controller.add(_sum); // emit current sum to our listener
});
// return an output stream for our listener
return _controller.stream;
}
@override
StreamTransformer<RS, RT> cast<RS, RT>() {
return StreamTransformer.castFrom(this);
}
}
此示例是 over-simplified(但仍然有效)并且不包括流暂停、恢复或取消等情况。如果您 运行 进入 "Stream has already been listened"
错误,请确保流正在广播。