如何向 Observable 订阅返回的 Disposable 添加动作函数?

How to add action function to Disposable returned from subscribe on an Observable?

你有一个 Observable,然后你订阅它。 return 值是一次性的。如何添加一个函数,以便在 Disposable 实例被释放时调用?

clickStream.subscribe(....).dispose();

调用 dispose() 时我想调用一个调试函数,例如 console.log('Disposing ....')。在真正的应用程序中,我想做一些非 rxjs 清理和 UI 通知操作。可能很简单,但我没有在 API 中看到它。谢谢

我想知道 finally 是否适用于您的用例 (documentation)。根据文档,Invokes a specified action after the source observable sequence terminates gracefully or exceptionally。请注意,这与调用 dispose 时调用的回调不完全相同,但我认为这可能足以满足您的需求,您可以将其用于 clean-up 操作。

更新

也接近您想要的,但不是很接近,您可以使用 using 运算符 (documentation)。它允许使用 dispose 方法创建一个对象,当对关联的可观察对象的订阅被销毁时,该方法将被调用。

更新 2

查看 source codefinally 实际上在处理对其可观察对象的订阅时执行操作。这包括终止或手动调用订阅 dispose()。这似乎非常接近您想要实现的目标。

最好是测试一下。请让我们了解最新结果。

一次性物品与事件源相关联。原则是事件源正在与 non-Rx 世界交互,并且当没有人订阅时可能需要做一些 clean-up。

如果你可以在 Disposable 中插入一个钩子,它是从 subscribe 的调用返回的,那么它就没有多大用处:你知道什么时候 调用了 dispose() 所以你可以在那里处理 clean-up 逻辑,如果一些 other 代码调用 dispose() 它可能在Disposable 进一步向下,这可能不会影响您的订阅。

听起来您正在尝试使用 side-effects,这不一定符合 RxJS API 的精神。如果你需要关闭一些网络连接或类似的东西,那么也许你应该使用一个自定义的 Observable,它创建一个 Disposable 来清理自己。

一个简单的示例(Observable.fromEvent 的简化实现)可能如下所示:

function fromEvent(obj, evt){
     var subject = new Rx.Subject();
     function listener(e){
        subject.onNext( e );
     }
     return Rx.Observable.create( function( observer ){
        var disp = subject.subscribe( observer );
          obj.addEventListener( evt, listener );
          return Rx.Disposable.create(function(){
             // All the clean-up code goes here
             obj.removeEventListener( evt, listener );
             disp.dispose();
          })
     });     
}


var events$ = fromEvent( button, 'click');
var count = 0;
var unsub = events$.subscribe( function(){
    console.log('click');
    count++;
    if( count > 5){
        unsub.dispose();
    }
})