正确使用 takeUntil() 停止 observable
Correctly use takeUntil() for stopping observable
我正在尝试创建 2 个基本上执行逆运算的可观察对象。它是一个服务发现端点,因此在启动应用程序时,它必须尝试注册到服务发现,直到成功为止。所以我想创建一个这样的可观察对象:
const create$ = Rx.Observable.create((observer) => {
observer.next('Trying to create observation');
sp.put(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
而当应用程序在正常关闭时,我想做反向操作。像这样:
const delete$ = Rx.Observable.create((observer) => {
console.log('deleted subscribed');
observer.next('Trying to delete observation');
sp.delete(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
所以我决定创建一个函数,该函数 returns 一个具有 .create()
和 .delete()
的对象。我要解决的问题是,当应用程序启动并尝试注册时,但无法到达服务发现端点,稍后应用程序启动它的正常关闭过程并调用 .delete()
操作,然后 .create()
操作不应该再运行了。
function observe({ url, version, serviceName }) {
const endpoint = `/endpoint/${serviceName}/${version}/${encodeURIComponent(url)}`;
const create$ = Rx.Observable.create((observer) => {
observer.next('Trying to create observation');
sp.put(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
const delete$ = Rx.Observable.create((observer) => {
console.log('deleted subscribed');
observer.next('Trying to delete observation');
sp.delete(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
return {
create() {
return create$.retry(Number.POSITIVE_INFINITY).takeUntil(delete$); // This is where I would want to takeUntil()
},
delete({ interval = 5000, times = 0 } = {}) {
return delete$.retry(times);
},
}
}
我遇到的问题是,当使用 .takeUntil()
时,它订阅了 delete$
,它开始执行删除操作,其效果是立即停止 create$
observable。
我试过 takeUntil(Observable.merge(Observable.never(), delete$))
但它同时订阅了两者,所以不起作用。我也尝试过 takeUntil(Observable.concat(Observable.never(), delete$))
第一个永远不会结束 (:P),第二个永远不会被订阅。
大多数 Observable 是 cold 和 unicast。这是一个有点冗长的话题,所以我将在很大程度上遵从这篇伟大的文章 Hot vs Cold Observables,但总结一下:
COLD is when your observable creates the producer
// COLD
var cold = new Observable((observer) => {
var producer = new Producer();
// have observer listen to producer here
});
HOT is when your observable closes over the producer
// HOT
var producer = new Producer();
var hot = new Observable((observer) => {
// have observer listen to producer here
});
在您的示例中,这是一个关键的区别,因为 create$
和 delete$
都是冷的。因此,正如您发现的那样,向 takeUntil
提供 delete$
会导致订阅 delete$
,从而启动请求。
如果您想保持代码的 structure/API 不变,实现此目的的一种方法是使用被视为某种 "notifier" 的主题。主题是多播和 "hot"(即使他们自己不做任何事情)。
function observe({ url, version, serviceName }) {
// etc...
const shutdown$ = new Subject(); // <---------------- create our notifier
return {
create() {
return create$
.retry(Number.POSITIVE_INFINITY)
.takeUntil(shutdown$); // <-------------------- take our notifier
},
delete({ interval = 5000, times = 0 } = {}) {
return Observable.defer(() => {
shutdown$.next(); // <------------------------- notify
return delete$.retry(times);
});
}
};
}
我们使用了 Observable.defer()
这样我们就可以在有人实际订阅我们 return.
的 Observable 时执行 shutdown$.next()
副作用
方法 #1
您可以在 .takeUntill
中使用 Observable,而无需通过创建中间体 Subject
实际订阅它。然后您可以订阅该主题而不是原始主题。是这样的:
const delete$ = new Subject();
...
create() {
return create$
.takeUntill(delete$);
},
delete() {
create(...).subscribe(delete$);
return delete;
}
方法 #2
但是在你的情况下,我认为创建主题会更好,它会在调用 .delete
时通知。像这样:
const onDelete$ = new Subject();
...
create() {
return create$
.takeUntill(onDelete$);
},
delete() {
onDelete$.next();
return ...
}
我正在尝试创建 2 个基本上执行逆运算的可观察对象。它是一个服务发现端点,因此在启动应用程序时,它必须尝试注册到服务发现,直到成功为止。所以我想创建一个这样的可观察对象:
const create$ = Rx.Observable.create((observer) => {
observer.next('Trying to create observation');
sp.put(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
而当应用程序在正常关闭时,我想做反向操作。像这样:
const delete$ = Rx.Observable.create((observer) => {
console.log('deleted subscribed');
observer.next('Trying to delete observation');
sp.delete(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
所以我决定创建一个函数,该函数 returns 一个具有 .create()
和 .delete()
的对象。我要解决的问题是,当应用程序启动并尝试注册时,但无法到达服务发现端点,稍后应用程序启动它的正常关闭过程并调用 .delete()
操作,然后 .create()
操作不应该再运行了。
function observe({ url, version, serviceName }) {
const endpoint = `/endpoint/${serviceName}/${version}/${encodeURIComponent(url)}`;
const create$ = Rx.Observable.create((observer) => {
observer.next('Trying to create observation');
sp.put(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
const delete$ = Rx.Observable.create((observer) => {
console.log('deleted subscribed');
observer.next('Trying to delete observation');
sp.delete(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
return {
create() {
return create$.retry(Number.POSITIVE_INFINITY).takeUntil(delete$); // This is where I would want to takeUntil()
},
delete({ interval = 5000, times = 0 } = {}) {
return delete$.retry(times);
},
}
}
我遇到的问题是,当使用 .takeUntil()
时,它订阅了 delete$
,它开始执行删除操作,其效果是立即停止 create$
observable。
我试过 takeUntil(Observable.merge(Observable.never(), delete$))
但它同时订阅了两者,所以不起作用。我也尝试过 takeUntil(Observable.concat(Observable.never(), delete$))
第一个永远不会结束 (:P),第二个永远不会被订阅。
大多数 Observable 是 cold 和 unicast。这是一个有点冗长的话题,所以我将在很大程度上遵从这篇伟大的文章 Hot vs Cold Observables,但总结一下:
COLD is when your observable creates the producer
// COLD var cold = new Observable((observer) => { var producer = new Producer(); // have observer listen to producer here });
HOT is when your observable closes over the producer
// HOT var producer = new Producer(); var hot = new Observable((observer) => { // have observer listen to producer here });
在您的示例中,这是一个关键的区别,因为 create$
和 delete$
都是冷的。因此,正如您发现的那样,向 takeUntil
提供 delete$
会导致订阅 delete$
,从而启动请求。
如果您想保持代码的 structure/API 不变,实现此目的的一种方法是使用被视为某种 "notifier" 的主题。主题是多播和 "hot"(即使他们自己不做任何事情)。
function observe({ url, version, serviceName }) {
// etc...
const shutdown$ = new Subject(); // <---------------- create our notifier
return {
create() {
return create$
.retry(Number.POSITIVE_INFINITY)
.takeUntil(shutdown$); // <-------------------- take our notifier
},
delete({ interval = 5000, times = 0 } = {}) {
return Observable.defer(() => {
shutdown$.next(); // <------------------------- notify
return delete$.retry(times);
});
}
};
}
我们使用了 Observable.defer()
这样我们就可以在有人实际订阅我们 return.
shutdown$.next()
副作用
方法 #1
您可以在 .takeUntill
中使用 Observable,而无需通过创建中间体 Subject
实际订阅它。然后您可以订阅该主题而不是原始主题。是这样的:
const delete$ = new Subject();
...
create() {
return create$
.takeUntill(delete$);
},
delete() {
create(...).subscribe(delete$);
return delete;
}
方法 #2
但是在你的情况下,我认为创建主题会更好,它会在调用 .delete
时通知。像这样:
const onDelete$ = new Subject();
...
create() {
return create$
.takeUntill(onDelete$);
},
delete() {
onDelete$.next();
return ...
}