Chained redux-observable 史诗只能正确触发一次
Chained redux-observable epic only fires correctly once
我设置了一个等待另一个史诗完成的史诗,就像@jayphelps 在这里的回答:
但是我发现它似乎只 运行 一次。之后,我可以在控制台中看到 CART_CONFIG_READY
操作,但未触发 DO_THE_NEXT_THING
操作。
我尝试了 mergeMap
和 switchMap
的各种组合,有和没有 take
,但似乎没有任何帮助。
我的代码(有点)是这样的。
import { NgRedux } from '@angular-redux/store';
import { Observable } from 'rxjs/Observable';
import { ActionsObservable } from 'redux-observable';
export class CartEpicsService {
checkCart = (action$: ActionsObservable<any>, store: NgRedux<any>) => {
return action$.ofType('CHECK_CART')
.switchMap(() => {
console.log('___LISTENING___');
return action$.ofType('CART_CONFIG_READY')
.take(1) // removing this doesn't help
.mergeMap(() => {
console.log('___RECEIVED___');
// do stuff here
return Observable.of({
type: 'DO_THE_NEXT_THING'
});
})
.startWith({
type: 'GET_CART_CONFIG'
});
});
}
getCartConfig = (action$: ActionsObservable<any>, store: NgRedux<any>) => {
return action$.ofType('GET_CART_CONFIG')
.switchMap(() => {
const config = store.getState().config;
// we already have the config
if (config) {
return Observable.of({
type: 'CART_CONFIG_READY'
});
}
// otherwise load it from the server using out HTTP service
return this.http.get('/cart/config')
.switchMap((response) => {
return Observable.concat(
Observable.of({
type: 'CART_CONFIG_SUCCESS'
}),
Observable.of({
type: 'CART_CONFIG_READY'
})
);
})
.catch(error => Observable.of({
type: 'CART_CONFIG_ERROR',
error
}));
});
}
}
对于上下文,我需要来自 /cart/config 端点的响应来检查购物车的有效性。我只需要下载一次配置。
这是 JS Bin 上的一个 运行nable 示例:
天哪,这绝对是一个棘手的问题!
原因
当 state.config === true
你 return 一个同步发出的 CART_CONFIG_READY
的 Observable,而在第一次 http 请求(或延迟,在 jsbin 中)意味着它总是在进行异步。
为什么这会有所不同是在 checkCart
史诗中你 return 一个可观察的链,它用 action$.ofType('CART_CONFIG_READY')
监听 CART_CONFIG_READY
但也应用 .startWith({ type: 'GET_CART_CONFIG' })
.这意味着 GET_CART_CONFIG
将在 之前同步发出 action$.ofType('CART_CONFIG_READY')
被订阅,因为 startWith
是 basically shorthand for a concat,这可能会使如果您熟悉它,问题会更清楚。这与这样做几乎完全相同:
Observable.concat(
Observable.of({
type: 'GET_CART_CONFIG'
}),
action$.ofType('CART_CONFIG_READY') // not subscribed until prior complete()s
.take(1)
.mergeMap(() => {
// stuff
})
);
总而言之,GET_CART_CONFIG
第二次发生的事情是同步调度的,getCartConfig
收到它并看到配置已经在存储中,所以它同步调度 CART_CONFIG_READY
.但是我们还没有在 checkCart
中收听它,所以它没有得到答复。然后那个调用堆栈 returns 和 concat 中的下一个 Observable,我们的 action$.ofType('CART_CONFIG_READY')
链,被订阅。但为时已晚,它监听的动作已经发出!
解决方案
解决此问题的一种方法是使 CART_CONFIG_READY
的发射始终异步,或者在我们派发 之前 在另一个史诗中开始监听它 GET_CART_CONFIG
.
1。发射 CART_CONFIG_READY 异步
Observable.of
接受调度程序作为其最后一个参数,RxJS supports several of them。
在这种情况下,您可以使用 AsyncScheduler
(宏任务)或 AsapScheduler
(微任务)。在这种情况下两者都可以工作,但它们在 JavaScript 事件循环中安排不同的时间。如果您不熟悉事件循环任务,check this out.
我个人建议在这种情况下使用 AsyncSheduler
,因为它将提供最接近于发出 http 请求的异步行为。
import { async } from 'rxjs/scheduler/async';
// later inside your epic...
return Observable.of({
type: 'CART_CONFIG_READY'
}, async);
2。在发出 GET_CART_CONFIG
之前监听 CART_CONFIG_READY
因为 startWith
对于 concat
是 shorthand(我们不想这样做)我们需要使用某种形式的 merge
,我们的ofType
先链接,以便我们在发射前收听。
action$.ofType('CART_CONFIG_READY')
.take(1)
.mergeMap(() => {
// stuff
})
.merge(
Observable.of({ type: 'GET_CART_CONFIG' })
)
// or
Observable.merge(
action$.ofType('CART_CONFIG_READY')
.take(1)
.mergeMap(() => {
// stuff
}),
Observable.of({ type: 'GET_CART_CONFIG' })
)
// both are exactly the same, pick personal preference on appearance
您只需执行其中一种解决方案,但同时执行这两种解决方案也不会有什么坏处。副手我可能会建议同时使用两者,以便事情一致和符合预期,即使它们有点冗长。
您可能也很高兴知道 Observable.of
接受任意数量的项目,这些项目将按顺序发出。所以你不需要使用 concat
:
// before
Observable.concat(
Observable.of({
type: 'CART_CONFIG_SUCCESS'
}),
Observable.of({
type: 'CART_CONFIG_READY'
})
)
// after
Observable.of({
type: 'CART_CONFIG_SUCCESS'
}, {
type: 'CART_CONFIG_READY'
})
非常感谢 jsbin 顺便说一句,它使调试变得 很多 更容易。
根据您的评论进行编辑:
Out of curiosity did you figure this out through experience or debugging?
两者的结合。我已经处理了大量 async/scheduled 代码,而排序通常是问题的根源。我扫描了代码,在脑海中想象执行过程,注意到异步与同步的区别取决于代码路径,然后我做了一个快速操作符,让我可以轻松地确认订阅任何 Observable 链的顺序。
Observable.prototype.logOnSubscribe = function (msg) {
// defer is a pretty useful Observable to learn if you haven't yet
return Observable.defer(() => {
console.log(msg);
return this; // the original source
});
};
我把它应用到几个地方,但最重要的是这两个:
action$.ofType('CART_CONFIG_READY')
.take(1)
.mergeMap(() => {
// stuff
})
.logOnSubscribe('listening for CART_CONFIG_READY') // <--- here
.startWith({
type: 'GET_CART_CONFIG'
});
// and in the other epic...
if (hasConfig) {
return Observable.of({
type: 'CART_CONFIG_READY'
})
.logOnSubscribe('emitting CART_CONFIG_READY'); // <--- and here
}
它确认在第二个代码路径中 CART_CONFIG_READY
在另一个史诗正在侦听它之前被发出。
我设置了一个等待另一个史诗完成的史诗,就像@jayphelps 在这里的回答:
但是我发现它似乎只 运行 一次。之后,我可以在控制台中看到 CART_CONFIG_READY
操作,但未触发 DO_THE_NEXT_THING
操作。
我尝试了 mergeMap
和 switchMap
的各种组合,有和没有 take
,但似乎没有任何帮助。
我的代码(有点)是这样的。
import { NgRedux } from '@angular-redux/store';
import { Observable } from 'rxjs/Observable';
import { ActionsObservable } from 'redux-observable';
export class CartEpicsService {
checkCart = (action$: ActionsObservable<any>, store: NgRedux<any>) => {
return action$.ofType('CHECK_CART')
.switchMap(() => {
console.log('___LISTENING___');
return action$.ofType('CART_CONFIG_READY')
.take(1) // removing this doesn't help
.mergeMap(() => {
console.log('___RECEIVED___');
// do stuff here
return Observable.of({
type: 'DO_THE_NEXT_THING'
});
})
.startWith({
type: 'GET_CART_CONFIG'
});
});
}
getCartConfig = (action$: ActionsObservable<any>, store: NgRedux<any>) => {
return action$.ofType('GET_CART_CONFIG')
.switchMap(() => {
const config = store.getState().config;
// we already have the config
if (config) {
return Observable.of({
type: 'CART_CONFIG_READY'
});
}
// otherwise load it from the server using out HTTP service
return this.http.get('/cart/config')
.switchMap((response) => {
return Observable.concat(
Observable.of({
type: 'CART_CONFIG_SUCCESS'
}),
Observable.of({
type: 'CART_CONFIG_READY'
})
);
})
.catch(error => Observable.of({
type: 'CART_CONFIG_ERROR',
error
}));
});
}
}
对于上下文,我需要来自 /cart/config 端点的响应来检查购物车的有效性。我只需要下载一次配置。
这是 JS Bin 上的一个 运行nable 示例:
天哪,这绝对是一个棘手的问题!
原因
当 state.config === true
你 return 一个同步发出的 CART_CONFIG_READY
的 Observable,而在第一次 http 请求(或延迟,在 jsbin 中)意味着它总是在进行异步。
为什么这会有所不同是在 checkCart
史诗中你 return 一个可观察的链,它用 action$.ofType('CART_CONFIG_READY')
监听 CART_CONFIG_READY
但也应用 .startWith({ type: 'GET_CART_CONFIG' })
.这意味着 GET_CART_CONFIG
将在 之前同步发出 action$.ofType('CART_CONFIG_READY')
被订阅,因为 startWith
是 basically shorthand for a concat,这可能会使如果您熟悉它,问题会更清楚。这与这样做几乎完全相同:
Observable.concat(
Observable.of({
type: 'GET_CART_CONFIG'
}),
action$.ofType('CART_CONFIG_READY') // not subscribed until prior complete()s
.take(1)
.mergeMap(() => {
// stuff
})
);
总而言之,GET_CART_CONFIG
第二次发生的事情是同步调度的,getCartConfig
收到它并看到配置已经在存储中,所以它同步调度 CART_CONFIG_READY
.但是我们还没有在 checkCart
中收听它,所以它没有得到答复。然后那个调用堆栈 returns 和 concat 中的下一个 Observable,我们的 action$.ofType('CART_CONFIG_READY')
链,被订阅。但为时已晚,它监听的动作已经发出!
解决方案
解决此问题的一种方法是使 CART_CONFIG_READY
的发射始终异步,或者在我们派发 之前 在另一个史诗中开始监听它 GET_CART_CONFIG
.
1。发射 CART_CONFIG_READY 异步
Observable.of
接受调度程序作为其最后一个参数,RxJS supports several of them。
在这种情况下,您可以使用 AsyncScheduler
(宏任务)或 AsapScheduler
(微任务)。在这种情况下两者都可以工作,但它们在 JavaScript 事件循环中安排不同的时间。如果您不熟悉事件循环任务,check this out.
我个人建议在这种情况下使用 AsyncSheduler
,因为它将提供最接近于发出 http 请求的异步行为。
import { async } from 'rxjs/scheduler/async';
// later inside your epic...
return Observable.of({
type: 'CART_CONFIG_READY'
}, async);
2。在发出 GET_CART_CONFIG
之前监听 CART_CONFIG_READY
因为 startWith
对于 concat
是 shorthand(我们不想这样做)我们需要使用某种形式的 merge
,我们的ofType
先链接,以便我们在发射前收听。
action$.ofType('CART_CONFIG_READY')
.take(1)
.mergeMap(() => {
// stuff
})
.merge(
Observable.of({ type: 'GET_CART_CONFIG' })
)
// or
Observable.merge(
action$.ofType('CART_CONFIG_READY')
.take(1)
.mergeMap(() => {
// stuff
}),
Observable.of({ type: 'GET_CART_CONFIG' })
)
// both are exactly the same, pick personal preference on appearance
您只需执行其中一种解决方案,但同时执行这两种解决方案也不会有什么坏处。副手我可能会建议同时使用两者,以便事情一致和符合预期,即使它们有点冗长。
您可能也很高兴知道 Observable.of
接受任意数量的项目,这些项目将按顺序发出。所以你不需要使用 concat
:
// before
Observable.concat(
Observable.of({
type: 'CART_CONFIG_SUCCESS'
}),
Observable.of({
type: 'CART_CONFIG_READY'
})
)
// after
Observable.of({
type: 'CART_CONFIG_SUCCESS'
}, {
type: 'CART_CONFIG_READY'
})
非常感谢 jsbin 顺便说一句,它使调试变得 很多 更容易。
根据您的评论进行编辑:
Out of curiosity did you figure this out through experience or debugging?
两者的结合。我已经处理了大量 async/scheduled 代码,而排序通常是问题的根源。我扫描了代码,在脑海中想象执行过程,注意到异步与同步的区别取决于代码路径,然后我做了一个快速操作符,让我可以轻松地确认订阅任何 Observable 链的顺序。
Observable.prototype.logOnSubscribe = function (msg) {
// defer is a pretty useful Observable to learn if you haven't yet
return Observable.defer(() => {
console.log(msg);
return this; // the original source
});
};
我把它应用到几个地方,但最重要的是这两个:
action$.ofType('CART_CONFIG_READY')
.take(1)
.mergeMap(() => {
// stuff
})
.logOnSubscribe('listening for CART_CONFIG_READY') // <--- here
.startWith({
type: 'GET_CART_CONFIG'
});
// and in the other epic...
if (hasConfig) {
return Observable.of({
type: 'CART_CONFIG_READY'
})
.logOnSubscribe('emitting CART_CONFIG_READY'); // <--- and here
}
它确认在第二个代码路径中 CART_CONFIG_READY
在另一个史诗正在侦听它之前被发出。