将重复的 send/responses 封装到单个异步函数中的同一个 Dart isolate

encapsulate repeated send/responses to the same Dart isolate within a single asyncronous function

是否可以将重复的 send/responses 封装到单个异步函数中的同一个 dart isolate?

背景:

为了设计方便API,我想有一个异步函数return isolate 生成的结果,例如

var ans = await askIsolate(isolateArgs);

如果我直接使用 spawnUri 调用生成的响应,这会很好用,例如

Future<String> askIsolate(Map<String,dynamic> isolateArgs) {

ReceivePort response = new ReceivePort();
var uri = Uri.parse(ISOLATE_URI);

Future<Isolate> remote = Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort);
return remote.then((i) => response.first)
               .catchError((e) { print("Failed to spawn isolate"); })
               .then((msg) => msg.toString());
}

然而,上述方法的缺点是,如果我需要重复调​​用 askIsolate,则每次都必须生成 isolate。

我更愿意与 运行 isolate 通信,这当然可以通过让 isolate return 向调用者发送 sendPort 来实现。但我相信自 2013 Isolate refactoring 以来,这要求调用者在接收端口上收听后续消息,从而无法在单个异步函数中进行封装。

是否有某种机制可以实现我所缺少的?

答案取决于您打算如何使用 isolate

  • 你打算无限期地保留它运行,向它发送输入并期望异步接收答案吗?

  • 是否要一次向 isolate 发送许多(但有限的)输入,期望异步接收答案,然后关闭 isolate?

我猜是后者,你的 askIsolate() 函数需要立即 return 一个 Future 然后在收到所有答案后完成。

await for 循环可用于监听流并使用事件直到它关闭。

我对isolates不熟悉,所以我希望这没问题,我还没有测试过。我假设隔离终止并且响应关闭。

String askIsolate(Map<String,dynamic> isolateArgs) async {

  ReceivePort response = new ReceivePort();
  var uri = Uri.parse(ISOLATE_URI);

  Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort)
    .catchError((e)) {
     throw ...;
   });

  List<String> answers = new List<String>;

  await for(var answer in response) {
    out.add(answer.toString());
  }

  return answers;
}

注:

  • response 是您正在收听答案的流。它是在 生成 isolate 之前创建的,因此您不需要(并且可能不应该)等待 isolate future 完成后再收听它。

  • 我做了 askIsolate() async 因为这样可以很容易地立即 return 一个在函数 returns 时完成的 future - 没有那些乏味的处理关于 .then(...) 链,我个人觉得这些链令人困惑且难以阅读。

顺便说一句,你原来的then(...).catchError(...)风格的代码最好这样写:

  Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort)
    .catchError((e) { ... });

   return response.first)
     .then((msg) => msg.toString());

我认为在隔离创建后延迟将 catchError 处理程序附加到行可能允许未来在处理程序到位之前完成错误。

参见:https://www.dartlang.org/articles/futures-and-error-handling/#potential-problem-failing-to-register-error-handlers-early .

我还建议查看 package:isolate 中的 IsolateRunner,它旨在解决这样的问题 - 在创建隔离时多次调用同一个隔离中的函数,而不是只调用一次。

如果您不想那样,还有其他更原始的选项

异步函数可以等待期货或流,ReceivePort 是一个流。 对于快速 hack,您可以在响应流上使用 await for 做一些事情,但这不是很方便。

ReceivePort 包装在 package:asyncStreamQueue 中是更好的选择。这使您可以将单个事件转换为期货。类似于:

myFunc() async {  
  var responses = new ReceivePort();
  var queue = new StreamQueue(responses);
  // queryFunction sends its own SendPort on the port you pass to it.
  var isolate = await isolate.spawn(queryFunction, [], responses.sendPort);
  var queryPort = await queue.next();
  for (var something in somethingToDo) {
    queryPort.send(something);
    var response = await queue.next();
    doSomethingWithIt(response);
  }
  queryPort.send("shutdown command");  
  // or isolate.kill(), but it's better to shut down cleanly.
  responses.close();  // Don't forget to close the receive port.
}

下面是一个基于 lrn 上面评论的快速工作示例。该示例通过 spawnURI 初始化一个 isolate,然后通过传递一个新的 ReceivePort 来与 isolate 通信,在该 ReceivePort 上需要一个回复。这允许 askIsolate 直接 return 来自 运行 spawnURI 隔离的响应。

为清楚起见,省略了注意错误处理。

隔离码:

import 'dart:isolate';
import 'dart:convert' show JSON;

main(List<String> initArgs, SendPort replyTo) async {
  ReceivePort receivePort = new ReceivePort();
  replyTo.send(receivePort.sendPort);

  receivePort.listen((List<dynamic> callArgs) async {
    SendPort thisResponsePort = callArgs.removeLast(); //last arg must be the offered sendport
    thisResponsePort.send("Map values: " + JSON.decode(callArgs[0]).values.join(","));
  });
}

调用代码:

import 'dart:async';
import 'dart:isolate';
import 'dart:convert';


const String ISOLATE_URI = "http://localhost/isolates/test_iso.dart";
SendPort isolateSendPort = null;

Future<SendPort> initIsolate(Uri uri) async {
    ReceivePort response = new ReceivePort();
    await Isolate.spawnUri(uri, [], response.sendPort, errorsAreFatal: true);
    print("Isolate spawned from $ISOLATE_URI");
    return await response.first;
}


Future<dynamic> askIsolate(Map<String,String> args) async {
  if (isolateSendPort == null) {
    print("ERROR: Isolate has not yet been spawned");
    isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); //try again
  }

  //Send args to the isolate, along with a receiveport upon which we listen for first response 
  ReceivePort response = new ReceivePort();
  isolateSendPort.send([JSON.encode(args), response.sendPort]);
  return await response.first;
}

main() async {
  isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI));

  askIsolate({ 'foo':'bar', 'biz':'baz'}).then(print);
  askIsolate({ 'zab':'zib', 'rab':'oof'}).then(print);
  askIsolate({ 'One':'Thanks', 'Two':'lrn'}).then(print);
}  

输出

Isolate spawned from http://localhost/isolates/test_iso.dart
Map values: bar,baz
Map values: zib,oof
Map values: Thanks,lrn