rxjs/redux-observer:observable.retry 到 re-establish 连接
rxjs/redux-observer: observable.retry to re-establish connection
我正在构建的应用程序中使用 elixir phoenix websocket,并且我有一个如下所示的史诗:
const socketObservable = Observable.create((observer: Object) => {
const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: {
token: readSession(),
} });
socket.connect();
socket.onOpen(() =>
observer.next({ type: SOCKET_CONNECTED, socket }),
);
socket.onError((error) =>
observer.error({ type: WEBSOCKET_ERROR, error }),
);
return () => {
// socket.disconnect();
};
});
const connectToSocket = (
action$: Object,
) => action$.ofType(CONNECT_TO_SOCKET)
.switchMap(() =>
socketObservable
.catch((error) => Observable.of(error)),
)
.retry();
export default connectToSocket;
我想要发生的是当网络连接断开时通过发出 { type: WEBSOCKET_ERROR, error }
通知用户,并在通过发出 { type: SOCKET_CONNECTED, socket }
重新建立连接时删除通知。好吧,我让第一部分工作了,但是当 re-connection 发生时,{ type: SOCKET_CONNECTED, socket }
永远不会调度。使用 redux-saga,我能够使用以下代码完成这项工作:
const connectToSocket = (): Object =>
eventChannel((emitter: (Object) => mixed) => {
const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: {
token: readSession(),
} });
socket.connect();
socket.onOpen(() => emitter({ socket }));
socket.onError((error) => {
emitter({ error });
});
return () => {
// socket.disconnect();
};
});
export function* callConnectToSocket(): Generator<IOEffect, *, *> {
const chan = yield call(connectToSocket);
while (true) {
const { socket, error } = yield take(chan);
if (socket) {
yield put({ type: SOCKET_CONNECTED, socket });
} else {
yield put({ error, type: WEBSOCKET_ERROR });
}
}
}
export function* watchConnectToSocket(): Generator<IOEffect, *, *> {
yield takeLatest(CONNECT_TO_SOCKET, callConnectToSocket);
}
对于 rxjs 代码,如果按照 documentation for rxjs Observable.retry 发出错误,我认为在链的末尾添加 .retry()
应该会触发我的源可观察对象的重试,但可能是我真的不明白 retry
应该做什么或如何正确使用它。可能有人可以帮助实现我想要的。
好吧,我终于放弃了在连接断开时抛出错误并将此行 observer.error({ type: WEBSOCKET_ERROR, error })
更改为 observer.next({ type: WEBSOCKET_ERROR, error })
。但我仍然想知道我在 retry
上做错了什么。任何对原始代码的帮助将不胜感激。
要使 retry
运算符生效,其源可观察对象必须产生错误。在您的示例中,错误通知似乎永远不会到达 retry
,因为它被从错误中恢复的 catch
运算符吞没了。
为了使其工作,您可以尝试使 catch
运算符 return 成为一个首先发出操作然后产生错误的可观察对象:
const connectToSocket = action$ =>
actions$.ofType(CONNECT_TO_SOCKET)
.switchMap(() => socketObservable
.catch(error => Observable.of(error).concat(Observable.throw(error)))
)
.retry();
更新:
我认为值得一提的是 Rx
遵循语法 next* (complete|error)?
,这意味着 next()
在同一观察者上的 error()
之后的调用将没有影响。因此,如果 socket
从错误中恢复并在执行 onError
后执行 onOpen
回调,则 SOCKET_CONNECTED
通知将不会到达消费者。
这可以通过将 error
替换为 next
通知或每次发生错误时重新启动 socketObservable
来处理,这意味着新的 socket
实例将被创建(但这可能不是你想要的)。
这是一个可运行的代码示例,演示了 retry
的工作原理:
const { createStore, applyMiddleware } = Redux;
const { createEpicMiddleware } = ReduxObservable;
const socketObservable = Rx.Observable.create(observer => {
const t1 = setTimeout(() => observer.next({ type: "SOCKET_CONNECTED" }), 200);
const t2 = setTimeout(() => observer.error({ type: "SOCKET_ERROR" }), 400);
return () => {
clearTimeout(t1);
clearTimeout(t2);
};
})
const connectToSocket = action$ => action$
.do(action => console.log(action))
.ofType("CONNECT_TO_SOCKET")
.switchMap(() => socketObservable
.catch(error => Rx.Observable.of(error).concat(Rx.Observable.throw(error)))
// make 2 attempts to re-connect, i.e. restart socketObservable
.retry(2)
)
// recover in case if both attempts to reconnect have failed
.retry();
const store = createStore(
(state, action) => state,
applyMiddleware(createEpicMiddleware(connectToSocket)));
// dispatch CONNECT_TO_SOCKET two times
Rx.Observable.interval(2000)
.take(2)
.subscribe(x => store.dispatch({ type: "CONNECT_TO_SOCKET" }));
<script src="https://unpkg.com/rxjs@5.4.2/bundles/Rx.min.js"></script>
<script src="https://unpkg.com/redux@3.7.2/dist/redux.min.js"></script>
<script src="https://unpkg.com/redux-observable@0.14.1/dist/redux-observable.min.js"></script>
我正在构建的应用程序中使用 elixir phoenix websocket,并且我有一个如下所示的史诗:
const socketObservable = Observable.create((observer: Object) => {
const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: {
token: readSession(),
} });
socket.connect();
socket.onOpen(() =>
observer.next({ type: SOCKET_CONNECTED, socket }),
);
socket.onError((error) =>
observer.error({ type: WEBSOCKET_ERROR, error }),
);
return () => {
// socket.disconnect();
};
});
const connectToSocket = (
action$: Object,
) => action$.ofType(CONNECT_TO_SOCKET)
.switchMap(() =>
socketObservable
.catch((error) => Observable.of(error)),
)
.retry();
export default connectToSocket;
我想要发生的是当网络连接断开时通过发出 { type: WEBSOCKET_ERROR, error }
通知用户,并在通过发出 { type: SOCKET_CONNECTED, socket }
重新建立连接时删除通知。好吧,我让第一部分工作了,但是当 re-connection 发生时,{ type: SOCKET_CONNECTED, socket }
永远不会调度。使用 redux-saga,我能够使用以下代码完成这项工作:
const connectToSocket = (): Object =>
eventChannel((emitter: (Object) => mixed) => {
const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: {
token: readSession(),
} });
socket.connect();
socket.onOpen(() => emitter({ socket }));
socket.onError((error) => {
emitter({ error });
});
return () => {
// socket.disconnect();
};
});
export function* callConnectToSocket(): Generator<IOEffect, *, *> {
const chan = yield call(connectToSocket);
while (true) {
const { socket, error } = yield take(chan);
if (socket) {
yield put({ type: SOCKET_CONNECTED, socket });
} else {
yield put({ error, type: WEBSOCKET_ERROR });
}
}
}
export function* watchConnectToSocket(): Generator<IOEffect, *, *> {
yield takeLatest(CONNECT_TO_SOCKET, callConnectToSocket);
}
对于 rxjs 代码,如果按照 documentation for rxjs Observable.retry 发出错误,我认为在链的末尾添加 .retry()
应该会触发我的源可观察对象的重试,但可能是我真的不明白 retry
应该做什么或如何正确使用它。可能有人可以帮助实现我想要的。
好吧,我终于放弃了在连接断开时抛出错误并将此行 observer.error({ type: WEBSOCKET_ERROR, error })
更改为 observer.next({ type: WEBSOCKET_ERROR, error })
。但我仍然想知道我在 retry
上做错了什么。任何对原始代码的帮助将不胜感激。
要使 retry
运算符生效,其源可观察对象必须产生错误。在您的示例中,错误通知似乎永远不会到达 retry
,因为它被从错误中恢复的 catch
运算符吞没了。
为了使其工作,您可以尝试使 catch
运算符 return 成为一个首先发出操作然后产生错误的可观察对象:
const connectToSocket = action$ =>
actions$.ofType(CONNECT_TO_SOCKET)
.switchMap(() => socketObservable
.catch(error => Observable.of(error).concat(Observable.throw(error)))
)
.retry();
更新:
我认为值得一提的是 Rx
遵循语法 next* (complete|error)?
,这意味着 next()
在同一观察者上的 error()
之后的调用将没有影响。因此,如果 socket
从错误中恢复并在执行 onError
后执行 onOpen
回调,则 SOCKET_CONNECTED
通知将不会到达消费者。
这可以通过将 error
替换为 next
通知或每次发生错误时重新启动 socketObservable
来处理,这意味着新的 socket
实例将被创建(但这可能不是你想要的)。
这是一个可运行的代码示例,演示了 retry
的工作原理:
const { createStore, applyMiddleware } = Redux;
const { createEpicMiddleware } = ReduxObservable;
const socketObservable = Rx.Observable.create(observer => {
const t1 = setTimeout(() => observer.next({ type: "SOCKET_CONNECTED" }), 200);
const t2 = setTimeout(() => observer.error({ type: "SOCKET_ERROR" }), 400);
return () => {
clearTimeout(t1);
clearTimeout(t2);
};
})
const connectToSocket = action$ => action$
.do(action => console.log(action))
.ofType("CONNECT_TO_SOCKET")
.switchMap(() => socketObservable
.catch(error => Rx.Observable.of(error).concat(Rx.Observable.throw(error)))
// make 2 attempts to re-connect, i.e. restart socketObservable
.retry(2)
)
// recover in case if both attempts to reconnect have failed
.retry();
const store = createStore(
(state, action) => state,
applyMiddleware(createEpicMiddleware(connectToSocket)));
// dispatch CONNECT_TO_SOCKET two times
Rx.Observable.interval(2000)
.take(2)
.subscribe(x => store.dispatch({ type: "CONNECT_TO_SOCKET" }));
<script src="https://unpkg.com/rxjs@5.4.2/bundles/Rx.min.js"></script>
<script src="https://unpkg.com/redux@3.7.2/dist/redux.min.js"></script>
<script src="https://unpkg.com/redux-observable@0.14.1/dist/redux-observable.min.js"></script>