如何在飞镖中管理来自 StreamSubscription 的多个数据?

How to manage multiple data from a StreamSubscription in dart?

我想从客户端应用程序发送和检索多个数据。

我试过这样做:

(服务器代码)

void start() async{
  try{
    _server = await ServerSocket.bind('127.0.0.1', 1234);
    print('Waiting for connections');
    _server.listen(_handleClient);
  } catch(e) { await _server.close(); }
}

void _handleClient(Socket socket) async {
  late String request, username, password;
  socket.listen((data) => request = String.fromCharCodes(data));
  socket.write(_publicKey);
  socket.listen((data) => _clientPublicKey = _decrypt(String.fromCharCodes(data))); 
  socket.listen((data) => username = _decrypt(String.fromCharCodes(data)));
  socket.listen((data) => password = _decrypt(String.fromCharCodes(data)));

  var database = Database(username, password);

  switch (request) {
    case 'getItem':
      getItem(socket, database);
      break;
    case 'addItem':
      addItem(socket, database);
      break;
      [...]
  }
}

但我有这个例外:

Waiting for connections
Unhandled exception:
Bad state: Stream has already been listened to.
#0      _StreamController._subscribe (dart:async/stream_controller.dart:635:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:786:19)
#2      _StreamImpl.listen (dart:async/stream_impl.dart:473:9)
#3      _Socket.listen (dart:io-patch/socket_patch.dart:2026:31)
#4      Server._handleClient (file:///D:/MyApps/Dart/my_netia_server/bin/Network/Server.dart:65:12)
#5      _RootZone.runUnaryGuarded (dart:async/zone.dart:1620:10)
#6      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:341:11)
#7      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:7)
[14 more...]

Process finished with exit code 255

问题是每次接收到数据时都会调用listen中定义的函数。 我想要的是我的代码仅在必要时才需要数据。这将使我能够更简单地准确管理接收到的数据类型(例如:Object、String、int、...)

如果你想一个一个地访问流的独立事件,当你准备好访问它们时,我推荐 package:async StreamQueue class。

您也可以使用 dart:async 平台库中的 StreamIterator class,但是 StreamQueue 有很大改进 API.

使用示例:

void _handleClient(Socket socket) async {
  var queue = StreamQueue(socket);
  
  String request = String.fromCharCodes(await queue.next);
  socket.write(_publicKey);
  _clientPublicKey = 
      _decrypt(String.fromCharCodes(await queue.next)));
  String username = 
      _decrypt(String.fromCharCodes(await queue.next)));
  String password = 
      _decrypt(String.fromCharCodes(await queue.next)));
  ...

这确实假设各个响应部分作为独立事件出现。这就是原版所做的,但这不是我所依赖的。

您更有可能想要累积字节直到您有足够的字节,然后解析它的初始部分。