flutter 如何并行处理 bloc 事件?
flutter How to process bloc events in parallel?
考虑一个用于计算颜色的应用程序。
- 服务器提供颜色列表。
- 用户可以在应用程序中点击颜色 UI
- 计算每种颜色的点击次数,每次点击都存储在服务器上。
我已经建立了一个 BLoC 来管理“颜色计数器”。
class ColorsBloc extends Bloc<ColorsEvent, ColorsState> {
final ColorRepository colorRepository;
ColorsBloc({required this.colorRepository}) : super(ColorsState.initial());
@override
Stream<ColorsState> mapEventToState(
ColorsEvent event,
) async* {
if (event is ColorsFetchRequested) {
yield ColorsState.loading();
try {
final colors = await colorRepository.getColors();
yield ColorsState.success(colors);
} catch (e) {
yield ColorsState.error();
}
} else if (event is ColorCounted) {
yield* _mapColorCountedToState(event);
}
}
Stream<ColorsState> _mapColorCountedToState(ColorCounted event) async* {
yield state.copyWith(
sendingByColorId: {...state.sendingByColorId, event.colorId},
);
await colorRepository.storeColor(Color(
colorId: event.colorId,
timestamp: DateTime.now().millisecondsSinceEpoch,
));
final colors = await colorRepository.getColors();
yield state.copyWith(
status: Status.success,
colors: colors,
sendingByColorId: {...state.sendingByColorId}..remove(event.colorId),
);
}
}
发送颜色点击需要一些时间(假设在慢速网络上需要 1 秒)。在将颜色存储到服务器之前,用户不得再次单击该颜色(sendingByColorId
设置跟踪的内容)。
问题
然而,用户可能会非常快速地点击不同的颜色。计数器在那种情况下工作,但它们落后了,因为事件是先进先出处理的(包括 await colorRepository.storeColor(...)
和 await
以获取更新的颜色列表)。
我希望发送状态在任何点击后立即更新,即使之前的点击当前正在将其存储到存储库中也是如此。
如何让 BLoC 在另一个事件等待 API 响应时继续处理新事件?
请注意 使用 Bloc 的主要思想是 predictability - you will lose that predictability to some degree (depending on your concrete implementation). If you are using flutter_bloc
you could follow this suggestion 并覆盖您的 bloc 上的默认事件流处理。
@override
Stream<Transition<MyEvent, MyState>> transformEvents(
Stream<MyEvent> events, transitionFn) {
return events.flatMap(transitionFn);
}
您也可以查看 isolates and maybe especially flutters compute which lets you spin up an isolate to run your code. I found this 作为一个好的来源。
虽然我非常确定有更好的方法可以做到这一点,但我想出了以下方法。我删除了您的一些逻辑,使其更通用一些。
我不熟悉 dart 中 compute
和 isolate
的性能细节,所以我想声明这可能不是最佳实践方法,但 也许它可以帮助您入门。
import 'package:bloc/bloc.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'dart:async';
void main() {
runApp(ExampleApp());
}
class ExampleApp extends StatelessWidget {
static ExampleBloc bloc = ExampleBloc();
@override
Widget build(BuildContext context) {
return MaterialApp(
home: TextButton(
onPressed: () => bloc.add(ExampleStartingEvent()),
child: Text("Trigger"),
),
);
}
}
// Top level function that is computed in isolate
Future<void> _mockRequest(String body) async {
// Do your async request here and await response
Future.delayed(Duration(seconds: 5));
ExampleBloc.successfulCompute("Successful!");
}
// Bloc
class ExampleBloc extends Bloc<ExampleEvent, ExampleState> {
ExampleBloc() : super(ExampleStateInitial());
static successfulCompute(String response) {
ExampleApp.bloc.add(ExampleEventSuccess(response));
}
@override
Stream<ExampleState> mapEventToState(
ExampleEvent event,
) async* {
if (event is ExampleEventSuccess) {
print(event.response);
yield ExampleStateSuccess(event.response);
}
if (event is ExampleStartingEvent) {
compute(_mockRequest, "body");
}
}
}
// Events
class ExampleEvent {}
class ExampleStartingEvent extends ExampleEvent {}
class ExampleEventSuccess extends ExampleEvent {
final response;
ExampleEventSuccess(this.response);
}
// States
class ExampleState {}
class ExampleStateInitial extends ExampleState {}
class ExampleStateSuccess extends ExampleState {
final response;
ExampleStateSuccess(this.response);
}
class ExampleStateError extends ExampleState {}
只是展示一个基于@kohjakob 的提议的解决方案,但是:
- 没有静态方法
- 完整的错误处理例程
这个想法基本上是将存储库调用包装到一个 async
方法 (_sendClick(...)
) 中,并将其称为非阻塞(即没有 await
),同时状态更新发送状态是同步完成的。
_sendClick(...)
等待存储库并在完成后将 ColorSendSuccess
或 ColorSendFailed
事件添加到 bloc。然后,这些事件在 mapEventToState(...)
例程的 运行 中处理。
class ColorsBloc extends Bloc<ColorsEvent, ColorsState> {
final ColorRepository colorRepository;
ColorsBloc({required this.colorRepository}) : super(ColorsState.initial());
@override
Stream<ColorsState> mapEventToState(
ColorsEvent event,
) async* {
if (event is ColorsFetchRequested) {
yield ColorsState.loading();
try {
final colors = await colorRepository.getColors();
yield ColorsState.success(colors);
} catch (e) {
yield ColorsState.error();
}
} else if (event is ColorCounted) {
yield* _mapColorCountedToState(event);
} else if (event is ColorSendSuccess) {
yield _mapColorSendSuccessToState(event);
} else if (event is ColorSendFailed) {
yield _mapColorSendFailedToState(event);
}
}
Stream<ColorsState> _mapColorCountedToState(ColorCounted event) async* {
yield state.copyWith(
sendingByColorId: {...state.sendingByColorId, event.colorId},
);
// non-blocking <----------------
_sendClick(Color(
colorId: event.colorId,
timestamp: DateTime.now().millisecondsSinceEpoch,
));
final colors = await colorRepository.getColors();
yield state.copyWith(
status: Status.success,
colors: colors,
sendingByColorId: {...state.sendingByColorId}..remove(event.colorId),
);
}
Future<void> _sendClick(Color color) async {
try {
int newId = await colorRepository.storeColor(color);
Color storedColor = color.copyWith(id: () => newId);
add(ColorSendSuccess(color: storedColor));
} on StoreColorClickException catch (_) {
add(ColorSendFailed(color: color));
}
}
ColorsState _mapColorSendSuccessToState(ColorCounted event) async* {
return state.copyWith(
colors: [...state.colors]
// replace the local color-click with the stored one
..removeWhere((element) => element.localId == event.color.localId)
..add(event.color.copyWith(localId: () => null)),
sendingByColorId: {...state.sendingByColorId}..remove(event.color.id),
);
}
ColorsState _mapColorSendFailedToState(ColorCounted event) async* {
return state.copyWith(
colors: [...state.colors]
// remove the color that has not been stored
..removeWhere((element) => element.localId == event.color.localId),
sendingByColorId: {...state.sendingByColorId}..remove(event.color.localId),
// mark the color as failed
errorByColorId: {...state.errorByColorId, event.color.localId},
);
}
}
考虑一个用于计算颜色的应用程序。
- 服务器提供颜色列表。
- 用户可以在应用程序中点击颜色 UI
- 计算每种颜色的点击次数,每次点击都存储在服务器上。
我已经建立了一个 BLoC 来管理“颜色计数器”。
class ColorsBloc extends Bloc<ColorsEvent, ColorsState> {
final ColorRepository colorRepository;
ColorsBloc({required this.colorRepository}) : super(ColorsState.initial());
@override
Stream<ColorsState> mapEventToState(
ColorsEvent event,
) async* {
if (event is ColorsFetchRequested) {
yield ColorsState.loading();
try {
final colors = await colorRepository.getColors();
yield ColorsState.success(colors);
} catch (e) {
yield ColorsState.error();
}
} else if (event is ColorCounted) {
yield* _mapColorCountedToState(event);
}
}
Stream<ColorsState> _mapColorCountedToState(ColorCounted event) async* {
yield state.copyWith(
sendingByColorId: {...state.sendingByColorId, event.colorId},
);
await colorRepository.storeColor(Color(
colorId: event.colorId,
timestamp: DateTime.now().millisecondsSinceEpoch,
));
final colors = await colorRepository.getColors();
yield state.copyWith(
status: Status.success,
colors: colors,
sendingByColorId: {...state.sendingByColorId}..remove(event.colorId),
);
}
}
发送颜色点击需要一些时间(假设在慢速网络上需要 1 秒)。在将颜色存储到服务器之前,用户不得再次单击该颜色(sendingByColorId
设置跟踪的内容)。
问题
然而,用户可能会非常快速地点击不同的颜色。计数器在那种情况下工作,但它们落后了,因为事件是先进先出处理的(包括 await colorRepository.storeColor(...)
和 await
以获取更新的颜色列表)。
我希望发送状态在任何点击后立即更新,即使之前的点击当前正在将其存储到存储库中也是如此。
如何让 BLoC 在另一个事件等待 API 响应时继续处理新事件?
请注意 使用 Bloc 的主要思想是 predictability - you will lose that predictability to some degree (depending on your concrete implementation). If you are using flutter_bloc
you could follow this suggestion 并覆盖您的 bloc 上的默认事件流处理。
@override
Stream<Transition<MyEvent, MyState>> transformEvents(
Stream<MyEvent> events, transitionFn) {
return events.flatMap(transitionFn);
}
您也可以查看 isolates and maybe especially flutters compute which lets you spin up an isolate to run your code. I found this 作为一个好的来源。
虽然我非常确定有更好的方法可以做到这一点,但我想出了以下方法。我删除了您的一些逻辑,使其更通用一些。
我不熟悉 dart 中 compute
和 isolate
的性能细节,所以我想声明这可能不是最佳实践方法,但 也许它可以帮助您入门。
import 'package:bloc/bloc.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'dart:async';
void main() {
runApp(ExampleApp());
}
class ExampleApp extends StatelessWidget {
static ExampleBloc bloc = ExampleBloc();
@override
Widget build(BuildContext context) {
return MaterialApp(
home: TextButton(
onPressed: () => bloc.add(ExampleStartingEvent()),
child: Text("Trigger"),
),
);
}
}
// Top level function that is computed in isolate
Future<void> _mockRequest(String body) async {
// Do your async request here and await response
Future.delayed(Duration(seconds: 5));
ExampleBloc.successfulCompute("Successful!");
}
// Bloc
class ExampleBloc extends Bloc<ExampleEvent, ExampleState> {
ExampleBloc() : super(ExampleStateInitial());
static successfulCompute(String response) {
ExampleApp.bloc.add(ExampleEventSuccess(response));
}
@override
Stream<ExampleState> mapEventToState(
ExampleEvent event,
) async* {
if (event is ExampleEventSuccess) {
print(event.response);
yield ExampleStateSuccess(event.response);
}
if (event is ExampleStartingEvent) {
compute(_mockRequest, "body");
}
}
}
// Events
class ExampleEvent {}
class ExampleStartingEvent extends ExampleEvent {}
class ExampleEventSuccess extends ExampleEvent {
final response;
ExampleEventSuccess(this.response);
}
// States
class ExampleState {}
class ExampleStateInitial extends ExampleState {}
class ExampleStateSuccess extends ExampleState {
final response;
ExampleStateSuccess(this.response);
}
class ExampleStateError extends ExampleState {}
只是展示一个基于@kohjakob 的提议的解决方案,但是:
- 没有静态方法
- 完整的错误处理例程
这个想法基本上是将存储库调用包装到一个 async
方法 (_sendClick(...)
) 中,并将其称为非阻塞(即没有 await
),同时状态更新发送状态是同步完成的。
_sendClick(...)
等待存储库并在完成后将 ColorSendSuccess
或 ColorSendFailed
事件添加到 bloc。然后,这些事件在 mapEventToState(...)
例程的 运行 中处理。
class ColorsBloc extends Bloc<ColorsEvent, ColorsState> {
final ColorRepository colorRepository;
ColorsBloc({required this.colorRepository}) : super(ColorsState.initial());
@override
Stream<ColorsState> mapEventToState(
ColorsEvent event,
) async* {
if (event is ColorsFetchRequested) {
yield ColorsState.loading();
try {
final colors = await colorRepository.getColors();
yield ColorsState.success(colors);
} catch (e) {
yield ColorsState.error();
}
} else if (event is ColorCounted) {
yield* _mapColorCountedToState(event);
} else if (event is ColorSendSuccess) {
yield _mapColorSendSuccessToState(event);
} else if (event is ColorSendFailed) {
yield _mapColorSendFailedToState(event);
}
}
Stream<ColorsState> _mapColorCountedToState(ColorCounted event) async* {
yield state.copyWith(
sendingByColorId: {...state.sendingByColorId, event.colorId},
);
// non-blocking <----------------
_sendClick(Color(
colorId: event.colorId,
timestamp: DateTime.now().millisecondsSinceEpoch,
));
final colors = await colorRepository.getColors();
yield state.copyWith(
status: Status.success,
colors: colors,
sendingByColorId: {...state.sendingByColorId}..remove(event.colorId),
);
}
Future<void> _sendClick(Color color) async {
try {
int newId = await colorRepository.storeColor(color);
Color storedColor = color.copyWith(id: () => newId);
add(ColorSendSuccess(color: storedColor));
} on StoreColorClickException catch (_) {
add(ColorSendFailed(color: color));
}
}
ColorsState _mapColorSendSuccessToState(ColorCounted event) async* {
return state.copyWith(
colors: [...state.colors]
// replace the local color-click with the stored one
..removeWhere((element) => element.localId == event.color.localId)
..add(event.color.copyWith(localId: () => null)),
sendingByColorId: {...state.sendingByColorId}..remove(event.color.id),
);
}
ColorsState _mapColorSendFailedToState(ColorCounted event) async* {
return state.copyWith(
colors: [...state.colors]
// remove the color that has not been stored
..removeWhere((element) => element.localId == event.color.localId),
sendingByColorId: {...state.sendingByColorId}..remove(event.color.localId),
// mark the color as failed
errorByColorId: {...state.errorByColorId, event.color.localId},
);
}
}