flutter 如何并行处理 bloc 事件?

flutter How to process bloc events in parallel?

考虑一个用于计算颜色的应用程序。

我已经建立了一个 BLoC 来管理“颜色计数器”。

class ColorsBloc extends Bloc<ColorsEvent, ColorsState> {
  final ColorRepository colorRepository;

  ColorsBloc({required this.colorRepository}) : super(ColorsState.initial());

  @override
  Stream<ColorsState> mapEventToState(
      ColorsEvent event,
      ) async* {
    if (event is ColorsFetchRequested) {
      yield ColorsState.loading();
      try {
        final colors = await colorRepository.getColors();
        yield ColorsState.success(colors);
      } catch (e) {
        yield ColorsState.error();
      }
    } else if (event is ColorCounted) {
      yield* _mapColorCountedToState(event);
    }
  }

  Stream<ColorsState> _mapColorCountedToState(ColorCounted event) async* {
    yield state.copyWith(
      sendingByColorId: {...state.sendingByColorId, event.colorId},
    );
    await colorRepository.storeColor(Color(
      colorId: event.colorId,
      timestamp: DateTime.now().millisecondsSinceEpoch,
    ));
    final colors = await colorRepository.getColors();
    yield state.copyWith(
      status: Status.success,
      colors: colors,
      sendingByColorId: {...state.sendingByColorId}..remove(event.colorId),
    );
  }
}

发送颜色点击需要一些时间(假设在慢速网络上需要 1 秒)。在将颜色存储到服务器之前,用户不得再次单击该颜色(sendingByColorId 设置跟踪的内容)。

问题

然而,用户可能会非常快速地点击不同的颜色。计数器在那种情况下工作,但它们落后了,因为事件是先进先出处理的(包括 await colorRepository.storeColor(...)await 以获取更新的颜色列表)。

我希望发送状态在任何点击后立即更新,即使之前的点击当前正在将其存储到存储库中也是如此。

如何让 BLoC 在另一个事件等待 API 响应时继续处理新事件?

请注意 使用 Bloc 的主要思想是 predictability - you will lose that predictability to some degree (depending on your concrete implementation). If you are using flutter_bloc you could follow this suggestion 并覆盖您的 bloc 上的默认事件流处理。

@override
Stream<Transition<MyEvent, MyState>> transformEvents(
   Stream<MyEvent> events, transitionFn) {
      return events.flatMap(transitionFn);
}

您也可以查看 isolates and maybe especially flutters compute which lets you spin up an isolate to run your code. I found this 作为一个好的来源。

虽然我非常确定有更好的方法可以做到这一点,但我想出了以下方法。我删除了您的一些逻辑,使其更通用一些。

我不熟悉 dart 中 computeisolate 的性能细节,所以我想声明这可能不是最佳实践方法,但 也许它可以帮助您入门

import 'package:bloc/bloc.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'dart:async';

void main() {
  runApp(ExampleApp());
}

class ExampleApp extends StatelessWidget {
  static ExampleBloc bloc = ExampleBloc();
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: TextButton(
        onPressed: () => bloc.add(ExampleStartingEvent()),
        child: Text("Trigger"),
      ),
    );
  }
}

// Top level function that is computed in isolate

Future<void> _mockRequest(String body) async {
  // Do your async request here and await response
  Future.delayed(Duration(seconds: 5));
  ExampleBloc.successfulCompute("Successful!");
}

// Bloc

class ExampleBloc extends Bloc<ExampleEvent, ExampleState> {
  ExampleBloc() : super(ExampleStateInitial());

  static successfulCompute(String response) {
    ExampleApp.bloc.add(ExampleEventSuccess(response));
  }

  @override
  Stream<ExampleState> mapEventToState(
    ExampleEvent event,
  ) async* {
    if (event is ExampleEventSuccess) {
      print(event.response);
      yield ExampleStateSuccess(event.response);
    }
    if (event is ExampleStartingEvent) {
      compute(_mockRequest, "body");
    }
  }
}

// Events

class ExampleEvent {}

class ExampleStartingEvent extends ExampleEvent {}

class ExampleEventSuccess extends ExampleEvent {
  final response;
  ExampleEventSuccess(this.response);
}

// States

class ExampleState {}

class ExampleStateInitial extends ExampleState {}

class ExampleStateSuccess extends ExampleState {
  final response;
  ExampleStateSuccess(this.response);
}

class ExampleStateError extends ExampleState {}

只是展示一个基于@kohjakob 的提议的解决方案,但是:

  • 没有静态方法
  • 完整的错误处理例程

这个想法基本上是将存储库调用包装到一个 async 方法 (_sendClick(...)) 中,并将其称为非阻塞(即没有 await),同时状态更新发送状态是同步完成的。

_sendClick(...) 等待存储库并在完成后将 ColorSendSuccessColorSendFailed 事件添加到 bloc。然后,这些事件在 mapEventToState(...) 例程的 运行 中处理。

class ColorsBloc extends Bloc<ColorsEvent, ColorsState> {
  final ColorRepository colorRepository;

  ColorsBloc({required this.colorRepository}) : super(ColorsState.initial());

  @override
  Stream<ColorsState> mapEventToState(
      ColorsEvent event,
      ) async* {
    if (event is ColorsFetchRequested) {
      yield ColorsState.loading();
      try {
        final colors = await colorRepository.getColors();
        yield ColorsState.success(colors);
      } catch (e) {
        yield ColorsState.error();
      }
    } else if (event is ColorCounted) {
      yield* _mapColorCountedToState(event);
    } else if (event is ColorSendSuccess) {
      yield _mapColorSendSuccessToState(event);
    } else if (event is ColorSendFailed) {
      yield _mapColorSendFailedToState(event);
    }
  }

  Stream<ColorsState> _mapColorCountedToState(ColorCounted event) async* {
    yield state.copyWith(
      sendingByColorId: {...state.sendingByColorId, event.colorId},
    );
    // non-blocking <----------------
    _sendClick(Color(
      colorId: event.colorId,
      timestamp: DateTime.now().millisecondsSinceEpoch,
    ));
    final colors = await colorRepository.getColors();
    yield state.copyWith(
      status: Status.success,
      colors: colors,
      sendingByColorId: {...state.sendingByColorId}..remove(event.colorId),
    );
  }

  Future<void> _sendClick(Color color) async {
    try {
     int newId = await colorRepository.storeColor(color);
     Color storedColor = color.copyWith(id: () => newId);
     add(ColorSendSuccess(color: storedColor));
    } on StoreColorClickException catch (_) {
      add(ColorSendFailed(color: color));
    }
  }

  ColorsState _mapColorSendSuccessToState(ColorCounted event) async* {
    return state.copyWith(
      colors: [...state.colors]
        // replace the local color-click with the stored one
        ..removeWhere((element) => element.localId == event.color.localId)
        ..add(event.color.copyWith(localId: () => null)),
      sendingByColorId: {...state.sendingByColorId}..remove(event.color.id),
    );
  }

  ColorsState _mapColorSendFailedToState(ColorCounted event) async* {
    return state.copyWith(
      colors: [...state.colors]
        // remove the color that has not been stored
        ..removeWhere((element) => element.localId == event.color.localId),
      sendingByColorId: {...state.sendingByColorId}..remove(event.color.localId),
      // mark the color as failed
      errorByColorId: {...state.errorByColorId, event.color.localId},
    );
  }
}