websocket 自动重新连接 flutter 和 riverpod?
websocket automatic reconnect with flutter & riverpod?
1. OBJECTIVE
我希望我的自定义 WebSocket 服务器 (API) 和我的 Flutter 应用程序之间的连接能够在遇到网络问题或 WebSocket 服务器遇到问题时自动重新建立。
- 用例 1:wifi 停止并突然恢复。
- 用例2:API未启动突然重启
- 约束:我使用 Riverpod 作为状态管理库(我想保留它 :))。
我强调状态管理库是因为我在 StreamProvider 中创建了 WS 连接(参见 Riverpod)。
2。没有自动重新连接的初始设置
- 我创建了一个 StreamProvider,如下所示:
final hostProvider =
StreamProvider.autoDispose.family<Host, String>((ref, ip) async* {
//SOCKET OPEN
final channel = IOWebSocketChannel.connect('ws://$ip:$port/v1/path');
ref.onDispose(() {
// SOCKET CLOSE
return channel.sink.close();
});
await for (final json in channel.stream) {
final jsonStr = jsonDecode(json as String);
yield Host.fromJson(jsonStr as Map<String, dynamic>);
}
});
- 然后我创建了一个小部件来使用数据:
useProvider(hostProvider(ip)).when(
data: (data) => show the result
loading: () => show progress bar
error: (error, _) => show error
);
这段代码效果很好。但是没有自动重连机制。
3。自动重新连接尝试次数
- 每当捕获到异常时,我都会在 try/catch 中调用函数 connectWs:
final hostProvider =
StreamProvider.autoDispose.family<Host, String>((ref, ip) async* {
// Open the connection
connectWs('ws://$ip:$port/v1/path').then((value) async* {
final channel = IOWebSocketChannel(value);
ref.onDispose(() {
return channel.sink.close();
});
await for (final json in channel.stream) {
final jsonStr = jsonDecode(json as String);
yield Host.fromJson(jsonStr as Map<String, dynamic>);
}
});
});
Future<WebSocket> connectWs(String path) async {
try {
return await WebSocket.connect(path);
} catch (e) {
print("Error! " + e.toString());
await Future.delayed(Duration(milliseconds: 2000));
return await connectWs(path);
}
}
- 我创建了一个 connectProvider 提供者,如下所示,我 'watched' 在 hostProvider 中创建一个通道。每当出现异常时,我都会使用 Riverpod 库中的刷新功能来重新创建频道:
// used in hostProvider
ref.container.refresh(connectProvider(ip))
final connectProvider =
Provider.family<Host, String>((ref, ip) {
//SOCKET OPEN
return IOWebSocketChannel.connect('ws://$ip:$port/v1/path');
});
在此先感谢您的帮助。
我是 riverpod 的新手,但在我看来您想使用更高级别的 redux/bloc 样式流程在每次失败时重新创建提供程序...
这个更高级别的集团在连接成功时创建提供者,当连接失败时,您向集团发送一个事件,告诉它重新连接并重新创建提供者...
这是我的想法,但我还是这个包的初学者。
谢谢,@Dewey。
最后,我找到了适用于我的用例的解决方法:
我的提供商:channelProvider 和 streamProvider
static final channelProvider = Provider.autoDispose
.family<IOWebSocketChannel, HttpParam>((ref, httpParam) {
log.i('channelProvider | Metrics - $httpParam');
return IOWebSocketChannel.connect(
'ws://${httpParam.ip}:$port/v1/${httpParam.path}');
});
static final streamProvider =
StreamProvider.autoDispose.family<dynamic, HttpParam>((ref, httpParam) {
log.i('streamProvider | Metrics - $httpParam');
log.i('streamProvider | Metrics - socket ${httpParam.path} opened');
var bStream = ref
.watch(channelProvider(httpParam))
.stream
.asBroadcastStream(onCancel: (sub) => sub.cancel());
var isSubControlError = false;
final sub = bStream.listen(
(data) {
ref
.watch(channelProvider(httpParam))
.sink
?.add('> sink add ${httpParam.path}');
},
onError: (_, stack) => null,
onDone: () async {
isSubControlError = true;
await Future.delayed(Duration(seconds: 10));
ref.container.refresh(channelProvider(httpParam));
},
);
ref.onDispose(() {
log.i('streamProvider | Metrics - socket ${httpParam.path} closed');
sub.cancel();
if (isSubControlError == false)
ref.watch(channelProvider(httpParam)).sink?.close(1001);
bStream = null;
});
return bStream;
});
我在我的小部件中以这种方式使用 streamProvider:
return useProvider(MetricsWsRepository.streamProvider(HttpParam(
ip: ip,
path: 'dummy-path',
))).when(
data: (data) => deserialize & doSomething1,
loading:() => doSomething2,
error: (_, stack) => doSomething3
)
1. OBJECTIVE
我希望我的自定义 WebSocket 服务器 (API) 和我的 Flutter 应用程序之间的连接能够在遇到网络问题或 WebSocket 服务器遇到问题时自动重新建立。
- 用例 1:wifi 停止并突然恢复。
- 用例2:API未启动突然重启
- 约束:我使用 Riverpod 作为状态管理库(我想保留它 :))。 我强调状态管理库是因为我在 StreamProvider 中创建了 WS 连接(参见 Riverpod)。
2。没有自动重新连接的初始设置
- 我创建了一个 StreamProvider,如下所示:
final hostProvider =
StreamProvider.autoDispose.family<Host, String>((ref, ip) async* {
//SOCKET OPEN
final channel = IOWebSocketChannel.connect('ws://$ip:$port/v1/path');
ref.onDispose(() {
// SOCKET CLOSE
return channel.sink.close();
});
await for (final json in channel.stream) {
final jsonStr = jsonDecode(json as String);
yield Host.fromJson(jsonStr as Map<String, dynamic>);
}
});
- 然后我创建了一个小部件来使用数据:
useProvider(hostProvider(ip)).when(
data: (data) => show the result
loading: () => show progress bar
error: (error, _) => show error
);
这段代码效果很好。但是没有自动重连机制。
3。自动重新连接尝试次数
- 每当捕获到异常时,我都会在 try/catch 中调用函数 connectWs:
final hostProvider =
StreamProvider.autoDispose.family<Host, String>((ref, ip) async* {
// Open the connection
connectWs('ws://$ip:$port/v1/path').then((value) async* {
final channel = IOWebSocketChannel(value);
ref.onDispose(() {
return channel.sink.close();
});
await for (final json in channel.stream) {
final jsonStr = jsonDecode(json as String);
yield Host.fromJson(jsonStr as Map<String, dynamic>);
}
});
});
Future<WebSocket> connectWs(String path) async {
try {
return await WebSocket.connect(path);
} catch (e) {
print("Error! " + e.toString());
await Future.delayed(Duration(milliseconds: 2000));
return await connectWs(path);
}
}
- 我创建了一个 connectProvider 提供者,如下所示,我 'watched' 在 hostProvider 中创建一个通道。每当出现异常时,我都会使用 Riverpod 库中的刷新功能来重新创建频道:
// used in hostProvider
ref.container.refresh(connectProvider(ip))
final connectProvider =
Provider.family<Host, String>((ref, ip) {
//SOCKET OPEN
return IOWebSocketChannel.connect('ws://$ip:$port/v1/path');
});
在此先感谢您的帮助。
我是 riverpod 的新手,但在我看来您想使用更高级别的 redux/bloc 样式流程在每次失败时重新创建提供程序...
这个更高级别的集团在连接成功时创建提供者,当连接失败时,您向集团发送一个事件,告诉它重新连接并重新创建提供者...
这是我的想法,但我还是这个包的初学者。
谢谢,@Dewey。
最后,我找到了适用于我的用例的解决方法:
我的提供商:channelProvider 和 streamProvider
static final channelProvider = Provider.autoDispose
.family<IOWebSocketChannel, HttpParam>((ref, httpParam) {
log.i('channelProvider | Metrics - $httpParam');
return IOWebSocketChannel.connect(
'ws://${httpParam.ip}:$port/v1/${httpParam.path}');
});
static final streamProvider =
StreamProvider.autoDispose.family<dynamic, HttpParam>((ref, httpParam) {
log.i('streamProvider | Metrics - $httpParam');
log.i('streamProvider | Metrics - socket ${httpParam.path} opened');
var bStream = ref
.watch(channelProvider(httpParam))
.stream
.asBroadcastStream(onCancel: (sub) => sub.cancel());
var isSubControlError = false;
final sub = bStream.listen(
(data) {
ref
.watch(channelProvider(httpParam))
.sink
?.add('> sink add ${httpParam.path}');
},
onError: (_, stack) => null,
onDone: () async {
isSubControlError = true;
await Future.delayed(Duration(seconds: 10));
ref.container.refresh(channelProvider(httpParam));
},
);
ref.onDispose(() {
log.i('streamProvider | Metrics - socket ${httpParam.path} closed');
sub.cancel();
if (isSubControlError == false)
ref.watch(channelProvider(httpParam)).sink?.close(1001);
bStream = null;
});
return bStream;
});
我在我的小部件中以这种方式使用 streamProvider:
return useProvider(MetricsWsRepository.streamProvider(HttpParam(
ip: ip,
path: 'dummy-path',
))).when(
data: (data) => deserialize & doSomething1,
loading:() => doSomething2,
error: (_, stack) => doSomething3
)