使用 subscription.next() 是一种反模式吗?

Is it an antipattern to use subscription.next()?

我有一个订阅,我想在一段中间件代码中使用它来发出事件信号。所以我有一件事想要订阅一个可观察的,另一件事不太可观察。

我考虑过为此使用 Subjects - 这似乎是它们的用途:

const repeater = new Rx.Subject();

function subscribe(observer) {
  return repeater.subscribe(observer);
}

// in some other function, call repeater.next(val)

但后来我开始研究常规 subscribe() 调用的内容 return。我可以这样做:

let sub = null;
function subscribe(observer) {
  return Rx.Observable.create((o) => sub = o);
}
// elsewhere sub.next(val)

还有?

let unsub = null;
function subscribe(observer) {
  unsub = Rx.Observable.create(() => false).subscribe(observer)
}
// elsewhere unsub.next(val)

所有这些都会向订阅者发出 val 信号。我在这里不明白的奇怪的事情是订阅 return 有一个 next() 可用 - 我认为 next() 只存在于 Observable 上下文中的观察者。

无论如何我都需要处理取消订阅的小玩意——当中间件被拆除时,我需要发出流完成信号并释放一些资源。令我惊讶的是,unsub 的下一个功能。

这向我发出信号,就 Observers、Observables 和 Subjects 等而言,我还没有完全理解 RxJS 的一些内容。总的来说,我了解如何将事件源和其他类似的东西连接到可观察的流中。它实际上只是在从无聊的函数调用中构建一个可观察流的上下文中——每当外部库调用该函数时,这个流就会发出一个更新的可观察对象的信号。

订阅者扩展订阅和观察者,添加状态。它公开了一种更改状态的方法(即 unsubscribe()),并且 它还公开了观察者的 next()/error()/complete() 方法, 但是这些方法现在既能保持状态又能改变状态。

所以,如果我给你一个裸观察者,你可以调用 next()/error()/complete() 以任何顺序,你想要的次数都可以,即使它会 在你给我的 complete().

打电话后,你再打电话给我的 next() 太糟糕了

另一方面,如果我给你一个包裹在 订户,现在有状态,如果您尝试调用 next() 你打电话给 complete() 之后的那个订阅者,我不会看到它。 如果你打电话unsubscribe(),我会超脱

当您调用订阅时,如

subscriber = Rx.Observable.create(fn).subscribe(observer);

你得到的是同一个观察者,而且只有那个观察者, 包裹在订阅者中。这就是您看到 next()/error()/complete() 方法的原因。但是这些方法通常供内部使用,如果你用它们来喂养观察者,它不会做你期望的事情:

let observerA = {
    next: (x) => console.log('A: value: ' + x),
    error: (x) => console.log('A: error: ' + x),
    complete: () => console.log('A: completed')
}
let observerB = {
    next: (x) => console.log('B: value: ' + x),
    error: (x) => console.log('B: error: ' + x),
    complete: () => console.log('B: completed')
}

let observable = Rx.Observable.create(() => false);
let subscriberA = observable.subscribe(observerA);
let subscriberB = observable.map(x => 10*x).subscribe(observerB);
subscriberA.next(1); // only feeds observerA
// => "A: value: 1"
subscriberB.next(2); // only feeds observerB
// => "B: value: 2"  // What?

很有可能,对于您的用例,您会

  1. 想要使用 Subject,因为它为您提供了 next()/error()/complete() 接口,可以让您提供运算符链的前端,
  2. 想要使用 Subject,因为它可以让您将相同的值提供给多个观察者,
  3. 忘记您刚刚了解的有关订阅者的知识,因为您不会使用 next()/error()/complete() 订阅者接口。相反,将 subscribe() 返回的对象仅视为订阅,并且仅在其上使用 unsubscribe() 方法。

所以:

let subject = new Rx.Subject();
let subscriptionA = subject.subscribe(observerA);
let subscriptionB = subject.map(x=>10*x).subscribe(observerB);
subject.next(3);
// => A: value: 3
// => B: value: 30
subscriptionA.unsubscribe()
subject.next(4);
// => B: value: 40
subscriptionB.unsubscribe()
subject.next(5);
// (no output)

另见