将重复的 send/responses 封装到单个异步函数中的同一个 Dart isolate
encapsulate repeated send/responses to the same Dart isolate within a single asyncronous function
是否可以将重复的 send/responses 封装到单个异步函数中的同一个 dart isolate?
背景:
为了设计方便API,我想有一个异步函数return isolate 生成的结果,例如
var ans = await askIsolate(isolateArgs);
如果我直接使用 spawnUri 调用生成的响应,这会很好用,例如
Future<String> askIsolate(Map<String,dynamic> isolateArgs) {
ReceivePort response = new ReceivePort();
var uri = Uri.parse(ISOLATE_URI);
Future<Isolate> remote = Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort);
return remote.then((i) => response.first)
.catchError((e) { print("Failed to spawn isolate"); })
.then((msg) => msg.toString());
}
然而,上述方法的缺点是,如果我需要重复调用 askIsolate,则每次都必须生成 isolate。
我更愿意与 运行 isolate 通信,这当然可以通过让 isolate return 向调用者发送 sendPort 来实现。但我相信自 2013 Isolate refactoring 以来,这要求调用者在接收端口上收听后续消息,从而无法在单个异步函数中进行封装。
是否有某种机制可以实现我所缺少的?
答案取决于您打算如何使用 isolate
你打算无限期地保留它运行,向它发送输入并期望异步接收答案吗?
是否要一次向 isolate 发送许多(但有限的)输入,期望异步接收答案,然后关闭 isolate?
我猜是后者,你的 askIsolate()
函数需要立即 return 一个 Future
然后在收到所有答案后完成。
await for
循环可用于监听流并使用事件直到它关闭。
我对isolates不熟悉,所以我希望这没问题,我还没有测试过。我假设隔离终止并且响应关闭。
String askIsolate(Map<String,dynamic> isolateArgs) async {
ReceivePort response = new ReceivePort();
var uri = Uri.parse(ISOLATE_URI);
Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort)
.catchError((e)) {
throw ...;
});
List<String> answers = new List<String>;
await for(var answer in response) {
out.add(answer.toString());
}
return answers;
}
注:
response
是您正在收听答案的流。它是在 在 生成 isolate 之前创建的,因此您不需要(并且可能不应该)等待 isolate future 完成后再收听它。
我做了 askIsolate()
async 因为这样可以很容易地立即 return 一个在函数 returns 时完成的 future - 没有那些乏味的处理关于 .then(...)
链,我个人觉得这些链令人困惑且难以阅读。
顺便说一句,你原来的then(...).catchError(...)
风格的代码最好这样写:
Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort)
.catchError((e) { ... });
return response.first)
.then((msg) => msg.toString());
我认为在隔离创建后延迟将 catchError 处理程序附加到行可能允许未来在处理程序到位之前完成错误。
我还建议查看 package:isolate
中的 IsolateRunner
,它旨在解决这样的问题 - 在创建隔离时多次调用同一个隔离中的函数,而不是只调用一次。
如果您不想那样,还有其他更原始的选项
异步函数可以等待期货或流,ReceivePort
是一个流。
对于快速 hack,您可以在响应流上使用 await for
做一些事情,但这不是很方便。
将 ReceivePort
包装在 package:async
的 StreamQueue
中是更好的选择。这使您可以将单个事件转换为期货。类似于:
myFunc() async {
var responses = new ReceivePort();
var queue = new StreamQueue(responses);
// queryFunction sends its own SendPort on the port you pass to it.
var isolate = await isolate.spawn(queryFunction, [], responses.sendPort);
var queryPort = await queue.next();
for (var something in somethingToDo) {
queryPort.send(something);
var response = await queue.next();
doSomethingWithIt(response);
}
queryPort.send("shutdown command");
// or isolate.kill(), but it's better to shut down cleanly.
responses.close(); // Don't forget to close the receive port.
}
下面是一个基于 lrn 上面评论的快速工作示例。该示例通过 spawnURI 初始化一个 isolate,然后通过传递一个新的 ReceivePort 来与 isolate 通信,在该 ReceivePort 上需要一个回复。这允许 askIsolate 直接 return 来自 运行 spawnURI 隔离的响应。
为清楚起见,省略了注意错误处理。
隔离码:
import 'dart:isolate';
import 'dart:convert' show JSON;
main(List<String> initArgs, SendPort replyTo) async {
ReceivePort receivePort = new ReceivePort();
replyTo.send(receivePort.sendPort);
receivePort.listen((List<dynamic> callArgs) async {
SendPort thisResponsePort = callArgs.removeLast(); //last arg must be the offered sendport
thisResponsePort.send("Map values: " + JSON.decode(callArgs[0]).values.join(","));
});
}
调用代码:
import 'dart:async';
import 'dart:isolate';
import 'dart:convert';
const String ISOLATE_URI = "http://localhost/isolates/test_iso.dart";
SendPort isolateSendPort = null;
Future<SendPort> initIsolate(Uri uri) async {
ReceivePort response = new ReceivePort();
await Isolate.spawnUri(uri, [], response.sendPort, errorsAreFatal: true);
print("Isolate spawned from $ISOLATE_URI");
return await response.first;
}
Future<dynamic> askIsolate(Map<String,String> args) async {
if (isolateSendPort == null) {
print("ERROR: Isolate has not yet been spawned");
isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); //try again
}
//Send args to the isolate, along with a receiveport upon which we listen for first response
ReceivePort response = new ReceivePort();
isolateSendPort.send([JSON.encode(args), response.sendPort]);
return await response.first;
}
main() async {
isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI));
askIsolate({ 'foo':'bar', 'biz':'baz'}).then(print);
askIsolate({ 'zab':'zib', 'rab':'oof'}).then(print);
askIsolate({ 'One':'Thanks', 'Two':'lrn'}).then(print);
}
输出
Isolate spawned from http://localhost/isolates/test_iso.dart
Map values: bar,baz
Map values: zib,oof
Map values: Thanks,lrn
是否可以将重复的 send/responses 封装到单个异步函数中的同一个 dart isolate?
背景:
为了设计方便API,我想有一个异步函数return isolate 生成的结果,例如
var ans = await askIsolate(isolateArgs);
如果我直接使用 spawnUri 调用生成的响应,这会很好用,例如
Future<String> askIsolate(Map<String,dynamic> isolateArgs) {
ReceivePort response = new ReceivePort();
var uri = Uri.parse(ISOLATE_URI);
Future<Isolate> remote = Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort);
return remote.then((i) => response.first)
.catchError((e) { print("Failed to spawn isolate"); })
.then((msg) => msg.toString());
}
然而,上述方法的缺点是,如果我需要重复调用 askIsolate,则每次都必须生成 isolate。
我更愿意与 运行 isolate 通信,这当然可以通过让 isolate return 向调用者发送 sendPort 来实现。但我相信自 2013 Isolate refactoring 以来,这要求调用者在接收端口上收听后续消息,从而无法在单个异步函数中进行封装。
是否有某种机制可以实现我所缺少的?
答案取决于您打算如何使用 isolate
你打算无限期地保留它运行,向它发送输入并期望异步接收答案吗?
是否要一次向 isolate 发送许多(但有限的)输入,期望异步接收答案,然后关闭 isolate?
我猜是后者,你的 askIsolate()
函数需要立即 return 一个 Future
然后在收到所有答案后完成。
await for
循环可用于监听流并使用事件直到它关闭。
我对isolates不熟悉,所以我希望这没问题,我还没有测试过。我假设隔离终止并且响应关闭。
String askIsolate(Map<String,dynamic> isolateArgs) async {
ReceivePort response = new ReceivePort();
var uri = Uri.parse(ISOLATE_URI);
Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort)
.catchError((e)) {
throw ...;
});
List<String> answers = new List<String>;
await for(var answer in response) {
out.add(answer.toString());
}
return answers;
}
注:
response
是您正在收听答案的流。它是在 在 生成 isolate 之前创建的,因此您不需要(并且可能不应该)等待 isolate future 完成后再收听它。我做了
askIsolate()
async 因为这样可以很容易地立即 return 一个在函数 returns 时完成的 future - 没有那些乏味的处理关于.then(...)
链,我个人觉得这些链令人困惑且难以阅读。
顺便说一句,你原来的then(...).catchError(...)
风格的代码最好这样写:
Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort)
.catchError((e) { ... });
return response.first)
.then((msg) => msg.toString());
我认为在隔离创建后延迟将 catchError 处理程序附加到行可能允许未来在处理程序到位之前完成错误。
我还建议查看 package:isolate
中的 IsolateRunner
,它旨在解决这样的问题 - 在创建隔离时多次调用同一个隔离中的函数,而不是只调用一次。
如果您不想那样,还有其他更原始的选项
异步函数可以等待期货或流,ReceivePort
是一个流。
对于快速 hack,您可以在响应流上使用 await for
做一些事情,但这不是很方便。
将 ReceivePort
包装在 package:async
的 StreamQueue
中是更好的选择。这使您可以将单个事件转换为期货。类似于:
myFunc() async {
var responses = new ReceivePort();
var queue = new StreamQueue(responses);
// queryFunction sends its own SendPort on the port you pass to it.
var isolate = await isolate.spawn(queryFunction, [], responses.sendPort);
var queryPort = await queue.next();
for (var something in somethingToDo) {
queryPort.send(something);
var response = await queue.next();
doSomethingWithIt(response);
}
queryPort.send("shutdown command");
// or isolate.kill(), but it's better to shut down cleanly.
responses.close(); // Don't forget to close the receive port.
}
下面是一个基于 lrn 上面评论的快速工作示例。该示例通过 spawnURI 初始化一个 isolate,然后通过传递一个新的 ReceivePort 来与 isolate 通信,在该 ReceivePort 上需要一个回复。这允许 askIsolate 直接 return 来自 运行 spawnURI 隔离的响应。
为清楚起见,省略了注意错误处理。
隔离码:
import 'dart:isolate';
import 'dart:convert' show JSON;
main(List<String> initArgs, SendPort replyTo) async {
ReceivePort receivePort = new ReceivePort();
replyTo.send(receivePort.sendPort);
receivePort.listen((List<dynamic> callArgs) async {
SendPort thisResponsePort = callArgs.removeLast(); //last arg must be the offered sendport
thisResponsePort.send("Map values: " + JSON.decode(callArgs[0]).values.join(","));
});
}
调用代码:
import 'dart:async';
import 'dart:isolate';
import 'dart:convert';
const String ISOLATE_URI = "http://localhost/isolates/test_iso.dart";
SendPort isolateSendPort = null;
Future<SendPort> initIsolate(Uri uri) async {
ReceivePort response = new ReceivePort();
await Isolate.spawnUri(uri, [], response.sendPort, errorsAreFatal: true);
print("Isolate spawned from $ISOLATE_URI");
return await response.first;
}
Future<dynamic> askIsolate(Map<String,String> args) async {
if (isolateSendPort == null) {
print("ERROR: Isolate has not yet been spawned");
isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); //try again
}
//Send args to the isolate, along with a receiveport upon which we listen for first response
ReceivePort response = new ReceivePort();
isolateSendPort.send([JSON.encode(args), response.sendPort]);
return await response.first;
}
main() async {
isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI));
askIsolate({ 'foo':'bar', 'biz':'baz'}).then(print);
askIsolate({ 'zab':'zib', 'rab':'oof'}).then(print);
askIsolate({ 'One':'Thanks', 'Two':'lrn'}).then(print);
}
输出
Isolate spawned from http://localhost/isolates/test_iso.dart
Map values: bar,baz
Map values: zib,oof
Map values: Thanks,lrn