使用 subscription.next() 是一种反模式吗?
Is it an antipattern to use subscription.next()?
我有一个订阅,我想在一段中间件代码中使用它来发出事件信号。所以我有一件事想要订阅一个可观察的,另一件事不太可观察。
我考虑过为此使用 Subjects - 这似乎是它们的用途:
const repeater = new Rx.Subject();
function subscribe(observer) {
return repeater.subscribe(observer);
}
// in some other function, call repeater.next(val)
但后来我开始研究常规 subscribe() 调用的内容 return。我可以这样做:
let sub = null;
function subscribe(observer) {
return Rx.Observable.create((o) => sub = o);
}
// elsewhere sub.next(val)
还有?
let unsub = null;
function subscribe(observer) {
unsub = Rx.Observable.create(() => false).subscribe(observer)
}
// elsewhere unsub.next(val)
所有这些都会向订阅者发出 val 信号。我在这里不明白的奇怪的事情是订阅 return 有一个 next()
可用 - 我认为 next()
只存在于 Observable 上下文中的观察者。
无论如何我都需要处理取消订阅的小玩意——当中间件被拆除时,我需要发出流完成信号并释放一些资源。令我惊讶的是,unsub 的下一个功能。
这向我发出信号,就 Observers、Observables 和 Subjects 等而言,我还没有完全理解 RxJS 的一些内容。总的来说,我了解如何将事件源和其他类似的东西连接到可观察的流中。它实际上只是在从无聊的函数调用中构建一个可观察流的上下文中——每当外部库调用该函数时,这个流就会发出一个更新的可观察对象的信号。
订阅者扩展订阅和观察者,添加状态。它公开了一种更改状态的方法(即 unsubscribe()
),并且
它还公开了观察者的 next()
/error()
/complete()
方法,
但是这些方法现在既能保持状态又能改变状态。
所以,如果我给你一个裸观察者,你可以调用 next()
/error()
/complete()
以任何顺序,你想要的次数都可以,即使它会
在你给我的 complete()
.
打电话后,你再打电话给我的 next()
太糟糕了
另一方面,如果我给你一个包裹在
订户,现在有状态,如果您尝试调用 next()
你打电话给 complete()
之后的那个订阅者,我不会看到它。
如果你打电话unsubscribe()
,我会超脱
当您调用订阅时,如
subscriber = Rx.Observable.create(fn).subscribe(observer);
你得到的是同一个观察者,而且只有那个观察者,
包裹在订阅者中。这就是您看到 next()
/error()
/complete()
方法的原因。但是这些方法通常供内部使用,如果你用它们来喂养观察者,它不会做你期望的事情:
let observerA = {
next: (x) => console.log('A: value: ' + x),
error: (x) => console.log('A: error: ' + x),
complete: () => console.log('A: completed')
}
let observerB = {
next: (x) => console.log('B: value: ' + x),
error: (x) => console.log('B: error: ' + x),
complete: () => console.log('B: completed')
}
let observable = Rx.Observable.create(() => false);
let subscriberA = observable.subscribe(observerA);
let subscriberB = observable.map(x => 10*x).subscribe(observerB);
subscriberA.next(1); // only feeds observerA
// => "A: value: 1"
subscriberB.next(2); // only feeds observerB
// => "B: value: 2" // What?
很有可能,对于您的用例,您会
- 想要使用 Subject,因为它为您提供了
next()
/error()
/complete()
接口,可以让您提供运算符链的前端,
- 想要使用 Subject,因为它可以让您将相同的值提供给多个观察者,
- 忘记您刚刚了解的有关订阅者的知识,因为您不会使用
next()
/error()
/complete()
订阅者接口。相反,将 subscribe()
返回的对象仅视为订阅,并且仅在其上使用 unsubscribe()
方法。
所以:
let subject = new Rx.Subject();
let subscriptionA = subject.subscribe(observerA);
let subscriptionB = subject.map(x=>10*x).subscribe(observerB);
subject.next(3);
// => A: value: 3
// => B: value: 30
subscriptionA.unsubscribe()
subject.next(4);
// => B: value: 40
subscriptionB.unsubscribe()
subject.next(5);
// (no output)
另见 。
我有一个订阅,我想在一段中间件代码中使用它来发出事件信号。所以我有一件事想要订阅一个可观察的,另一件事不太可观察。
我考虑过为此使用 Subjects - 这似乎是它们的用途:
const repeater = new Rx.Subject();
function subscribe(observer) {
return repeater.subscribe(observer);
}
// in some other function, call repeater.next(val)
但后来我开始研究常规 subscribe() 调用的内容 return。我可以这样做:
let sub = null;
function subscribe(observer) {
return Rx.Observable.create((o) => sub = o);
}
// elsewhere sub.next(val)
还有?
let unsub = null;
function subscribe(observer) {
unsub = Rx.Observable.create(() => false).subscribe(observer)
}
// elsewhere unsub.next(val)
所有这些都会向订阅者发出 val 信号。我在这里不明白的奇怪的事情是订阅 return 有一个 next()
可用 - 我认为 next()
只存在于 Observable 上下文中的观察者。
无论如何我都需要处理取消订阅的小玩意——当中间件被拆除时,我需要发出流完成信号并释放一些资源。令我惊讶的是,unsub 的下一个功能。
这向我发出信号,就 Observers、Observables 和 Subjects 等而言,我还没有完全理解 RxJS 的一些内容。总的来说,我了解如何将事件源和其他类似的东西连接到可观察的流中。它实际上只是在从无聊的函数调用中构建一个可观察流的上下文中——每当外部库调用该函数时,这个流就会发出一个更新的可观察对象的信号。
订阅者扩展订阅和观察者,添加状态。它公开了一种更改状态的方法(即 unsubscribe()
),并且
它还公开了观察者的 next()
/error()
/complete()
方法,
但是这些方法现在既能保持状态又能改变状态。
所以,如果我给你一个裸观察者,你可以调用 next()
/error()
/complete()
以任何顺序,你想要的次数都可以,即使它会
在你给我的 complete()
.
next()
太糟糕了
另一方面,如果我给你一个包裹在
订户,现在有状态,如果您尝试调用 next()
你打电话给 complete()
之后的那个订阅者,我不会看到它。
如果你打电话unsubscribe()
,我会超脱
当您调用订阅时,如
subscriber = Rx.Observable.create(fn).subscribe(observer);
你得到的是同一个观察者,而且只有那个观察者,
包裹在订阅者中。这就是您看到 next()
/error()
/complete()
方法的原因。但是这些方法通常供内部使用,如果你用它们来喂养观察者,它不会做你期望的事情:
let observerA = {
next: (x) => console.log('A: value: ' + x),
error: (x) => console.log('A: error: ' + x),
complete: () => console.log('A: completed')
}
let observerB = {
next: (x) => console.log('B: value: ' + x),
error: (x) => console.log('B: error: ' + x),
complete: () => console.log('B: completed')
}
let observable = Rx.Observable.create(() => false);
let subscriberA = observable.subscribe(observerA);
let subscriberB = observable.map(x => 10*x).subscribe(observerB);
subscriberA.next(1); // only feeds observerA
// => "A: value: 1"
subscriberB.next(2); // only feeds observerB
// => "B: value: 2" // What?
很有可能,对于您的用例,您会
- 想要使用 Subject,因为它为您提供了
next()
/error()
/complete()
接口,可以让您提供运算符链的前端, - 想要使用 Subject,因为它可以让您将相同的值提供给多个观察者,
- 忘记您刚刚了解的有关订阅者的知识,因为您不会使用
next()
/error()
/complete()
订阅者接口。相反,将subscribe()
返回的对象仅视为订阅,并且仅在其上使用unsubscribe()
方法。
所以:
let subject = new Rx.Subject();
let subscriptionA = subject.subscribe(observerA);
let subscriptionB = subject.map(x=>10*x).subscribe(observerB);
subject.next(3);
// => A: value: 3
// => B: value: 30
subscriptionA.unsubscribe()
subject.next(4);
// => B: value: 40
subscriptionB.unsubscribe()
subject.next(5);
// (no output)
另见