RxJS/ReactiveX 正确的模块通信
RxJS/ReactiveX Proper modules communication
我对响应式编程还很陌生,但已经爱上了它。但是,仍然很难将我的大脑转移到它上面。我正在尝试遵循 "Avoid using subjects" 和 "Avoid impure functions" 的所有建议,当然还有 "Avoid imperative code."
我发现很难实现的是简单的跨模块通信,其中一个模块可以注册 "action"/observable,另一个可以订阅并对其做出反应。一个简单的消息总线可能会起作用,但这将强制使用我试图避免的主题和命令式代码风格。
所以这是我正在玩的一个简单的起点:
// some sandbox
class Api {
constructor() {
this.actions = {};
}
registerAction(actionName, action) {
// I guess this part will have to be changed
this.actions[actionName] = action.publishReplay(10).refCount();
//this.actions[actionName].connect();
}
getAction(actionName) {
return this.actions[actionName];
}
}
const api = new Api();
// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
console.log("EXECUTING");
obs.next("42 " + Date.now());
obs.complete();
});
api.registerAction("myAction", myAction);
let myTrigger = Rx.Observable.interval(1000).take(2);
let executedAction = myTrigger
.flatMap(x => api.getAction("myAction"))
.subscribe(
(x) => { console.log(`executed action: ${x}`); },
(e) => {},
() => { console.log("completed");});
// -------------------------------------------------------------------
// module 2
api.getAction("myAction")
.subscribe(
(x) => { console.log(`SECOND executed action: ${x}`); },
(e) => {},
() => { console.log("SECOND completed");});
所以目前第二个模块订阅它 "triggers" "myAction" Observable。在现实生活中,这可能是一个 ajax 调用。有没有办法让所有订阅者 delay/wait 直到 "myAction" 从 module1 正确调用?再一次 - 使用主题很容易做到这一点,但我正在尝试按照推荐的做法去做。
如果我对你的理解是正确的,你想要确保,如果你调用 api.getAction,你希望那个 observable 中的下一个值等到对 getAction 的调用完成。在处理其他值之前。
您可以使用 concatMap 轻松实现这一点。 ConcatMap 将采用 returns 可观察的函数(在您的情况下是对 getAction 的调用)。 ConcatMap 将等待开始处理下一个值,直到函数中返回的 observable 完成。
因此,如果您像这样更改代码,它应该可以工作(如果我理解正确的话)。
let executedAction = myTrigger
.concatMap(x => api.getAction("myAction"))
.subscribe(
(x) => { console.log(`executed action: ${x}`); },
(e) => {},
() => { console.log("completed");});
如果 myTrigger 有一个新值,在从 api.getAction 返回的 observable 完成之前不会处理它。
所以这是一个比我想的要简单得多的解决方案。只需使用 2 个可观察量。使用调度程序和 subscribeOn 可以实现类似的效果。
// some sandbox
class Action {
constructor(name, observable) {
this.name = name;
this.observable = observable;
this.replay = new Rx.ReplaySubject(10);
}
}
function actionFactory(action, param) {
return Rx.Observable.create(obs => {
action.observable
.subscribe(x => {
obs.next(x);
action.replay.next(x);
}, (e) => {}, () => obs.complete);
});
}
class Api {
constructor() {
this.actions = {};
}
registerAction(actionName, action) {
let generatedAction = new Action(actionName, action);
this.actions[actionName] = generatedAction;
return actionFactory.bind(null, generatedAction);
}
getAction(actionName) {
return this.actions[actionName].replay;
}
}
const api = new Api();
// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
obs.next("42 " + Date.now());
obs.complete();
});
let myRegisteredAction$ = api.registerAction("myAction", myAction);
let myTrigger = Rx.Observable.interval(1000).take(1).delay(1000);
let executedAction = myTrigger
.map(x => { return { someValue: x} })
.concatMap(x => myRegisteredAction$(x))
.subscribe(
(x) => { console.log(`MAIN: ${x}`); },
(e) => { console.log("error", e)},
() => { console.log("MAIN: completed");});
// -------------------------------------------------------------------
// module 2
var sub = api.getAction("myAction")
.subscribe(
(x) => { console.log(`SECOND: ${x}`); },
(e) => {console.log("error : " + e)},
() => { console.log("SECOND: completed");});
我对响应式编程还很陌生,但已经爱上了它。但是,仍然很难将我的大脑转移到它上面。我正在尝试遵循 "Avoid using subjects" 和 "Avoid impure functions" 的所有建议,当然还有 "Avoid imperative code."
我发现很难实现的是简单的跨模块通信,其中一个模块可以注册 "action"/observable,另一个可以订阅并对其做出反应。一个简单的消息总线可能会起作用,但这将强制使用我试图避免的主题和命令式代码风格。
所以这是我正在玩的一个简单的起点:
// some sandbox
class Api {
constructor() {
this.actions = {};
}
registerAction(actionName, action) {
// I guess this part will have to be changed
this.actions[actionName] = action.publishReplay(10).refCount();
//this.actions[actionName].connect();
}
getAction(actionName) {
return this.actions[actionName];
}
}
const api = new Api();
// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
console.log("EXECUTING");
obs.next("42 " + Date.now());
obs.complete();
});
api.registerAction("myAction", myAction);
let myTrigger = Rx.Observable.interval(1000).take(2);
let executedAction = myTrigger
.flatMap(x => api.getAction("myAction"))
.subscribe(
(x) => { console.log(`executed action: ${x}`); },
(e) => {},
() => { console.log("completed");});
// -------------------------------------------------------------------
// module 2
api.getAction("myAction")
.subscribe(
(x) => { console.log(`SECOND executed action: ${x}`); },
(e) => {},
() => { console.log("SECOND completed");});
所以目前第二个模块订阅它 "triggers" "myAction" Observable。在现实生活中,这可能是一个 ajax 调用。有没有办法让所有订阅者 delay/wait 直到 "myAction" 从 module1 正确调用?再一次 - 使用主题很容易做到这一点,但我正在尝试按照推荐的做法去做。
如果我对你的理解是正确的,你想要确保,如果你调用 api.getAction,你希望那个 observable 中的下一个值等到对 getAction 的调用完成。在处理其他值之前。
您可以使用 concatMap 轻松实现这一点。 ConcatMap 将采用 returns 可观察的函数(在您的情况下是对 getAction 的调用)。 ConcatMap 将等待开始处理下一个值,直到函数中返回的 observable 完成。
因此,如果您像这样更改代码,它应该可以工作(如果我理解正确的话)。
let executedAction = myTrigger
.concatMap(x => api.getAction("myAction"))
.subscribe(
(x) => { console.log(`executed action: ${x}`); },
(e) => {},
() => { console.log("completed");});
如果 myTrigger 有一个新值,在从 api.getAction 返回的 observable 完成之前不会处理它。
所以这是一个比我想的要简单得多的解决方案。只需使用 2 个可观察量。使用调度程序和 subscribeOn 可以实现类似的效果。
// some sandbox
class Action {
constructor(name, observable) {
this.name = name;
this.observable = observable;
this.replay = new Rx.ReplaySubject(10);
}
}
function actionFactory(action, param) {
return Rx.Observable.create(obs => {
action.observable
.subscribe(x => {
obs.next(x);
action.replay.next(x);
}, (e) => {}, () => obs.complete);
});
}
class Api {
constructor() {
this.actions = {};
}
registerAction(actionName, action) {
let generatedAction = new Action(actionName, action);
this.actions[actionName] = generatedAction;
return actionFactory.bind(null, generatedAction);
}
getAction(actionName) {
return this.actions[actionName].replay;
}
}
const api = new Api();
// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
obs.next("42 " + Date.now());
obs.complete();
});
let myRegisteredAction$ = api.registerAction("myAction", myAction);
let myTrigger = Rx.Observable.interval(1000).take(1).delay(1000);
let executedAction = myTrigger
.map(x => { return { someValue: x} })
.concatMap(x => myRegisteredAction$(x))
.subscribe(
(x) => { console.log(`MAIN: ${x}`); },
(e) => { console.log("error", e)},
() => { console.log("MAIN: completed");});
// -------------------------------------------------------------------
// module 2
var sub = api.getAction("myAction")
.subscribe(
(x) => { console.log(`SECOND: ${x}`); },
(e) => {console.log("error : " + e)},
() => { console.log("SECOND: completed");});