有没有办法为 RXJS Observables 做一个析构函数?

Is there a way to make a destructor for RXJS Observables?

在我的 Angular 应用程序中,我想从服务器获取 SSE 事件,然后对结果进行处理。为此,我找到了一个解决方案,将 SSE EventSource 包装到一个 Observable 中。代码如下:


import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable({
  providedIn: 'root',
})
export class SseServiceService {
  constructor(private _zone: NgZone) {}

  /**
   * Creates an event source
   */
  getEventSource(url: string): EventSource {
    return new EventSource(url);
  }

  /**
   * Returns an event source stream from the url
   *
   * @param url url of the event source
   */
  getServerSentEvent(url: string) {
    return new Observable((observer) => {
      const eventSource = this.getEventSource(url);

      eventSource.onmessage = (event) => {
        this._zone.run(() => {
          observer.next(event);
        });
      };

      eventSource.onerror = (error) => {
        this._zone.run(() => {
          observer.error(error);
        });
      };
    });
  }
}


问题是:

当 Observable 被销毁时,我不应该调用 eventSource.close() 吗?

有没有办法将析构函数分配给用new Observable()[=28 制作的observables =]?

您可以选择 return 一个拆卸函数形成传递给构造函数的“订阅函数”:

return new Observable((observer) => {
  const eventSource = this.getEventSource(url);
  ...
  return () => eventSource.close();
})

还有 finalize()tap() 等运算符(在 RxJS 7+ 中),可让您在处理链时调用函数。

希望 is what you need. But in the worst case, if for some reason you can't do it that way, it's possible in modern environments to get a callback in most cases when an object is going to be removed from memory because it's no longer referenced by anything and garbage collection is being done using a FinalizationRegistry。但请注意文档中的警告:

Avoid where possible

Correct use of FinalizationRegistry takes careful thought, and it's best avoided if possible. It's also important to avoid relying on any specific behaviors not guaranteed by the specification. When, how, and whether garbage collection occurs is down to the implementation of any given JavaScript engine. Any behavior you observe in one engine may be different in another engine, in another version of the same engine, or even in a slightly different situation with the same version of the same engine. Garbage collection is a hard problem that JavaScript engine implementers are constantly refining and improving their solutions to.

(披露:我在与将其添加到 JavaScript 的提案背后的 TC39 成员协商后编写了这些文档。)

也就是说,在正常情况下,在现代环境中,如果发生 垃圾收集,您应该得到回调。以下是您的操作方式(请参阅评论):

export class SseServiceService {
    /**
     * A registry for `EventSource` cleanup, see `getServerSentEvent`.
     */
    private sseRegistry = new FinalizationRegistry((eventSource) => {
        eventSource.close();
    });

    constructor(private _zone: NgZone) { }

    /**
     * Creates an event source
     */
    getEventSource(url: string): EventSource {
        return new EventSource(url);
    }

    /**
     * Returns an event source stream from the url
     *
     * @param url url of the event source
     */
    getServerSentEvent(url: string) {
        const eventSource = this.getEventSource(url);
        const observable = new Observable((observer) => {
            eventSource.onmessage = (event) => {
                this._zone.run(() => {
                    observer.next(event);
                });
            };

            eventSource.onerror = (error) => {
                this._zone.run(() => {
                    observer.error(error);
                });
            };
        });
        // Register the observable (held weakly) and the
        // event source that will be passed to the cleanup
        // callback
        this.sseRegistry.register(observable, eventSource);
    }
}

下面是调用清理回调的实例(在现代环境中):

const registry = new FinalizationRegistry((heldValue) => {
    console.log(`Cleanup callback for ${heldValue} was called`);
});

console.log("Adding objects...");
let o1 = Array.from({length: 10_000}, () => ({}));
registry.register(o1, "o1");
let o2 = Array.from({length: 10_000}, () => ({}));
registry.register(o2, "o2");
let o3 = Array.from({length: 10_000}, () => ({}));
registry.register(o3, "o3");

setTimeout(() => {
    console.log("Releasing o1 and o3...");
    o1 = null;
    o3 = null;
}, 800);
setTimeout(() => {
    console.log("Encouraging garbage collection by allocating a large object");
    Array.from({length: 1_000_000}, () => ({}));
}, 1600);

对我来说,使用带有 V8 引擎的 Chromium 浏览器,我根本看不到清理回调(好吧,我等了大约一分钟),除非我们通过分配一个大对象来“鼓励”垃圾收集。相比之下,对于 Firefox 中的 SpiderMonkey,我看到即使没有鼓励垃圾收集,也会在几秒钟的延迟后进行清理。这强调了清理回调如何比对象不再被引用(或根本不被引用)晚得多被调用,以及它如何因实现而异。

所以,只有在找不到任何其他方法时才这样做。希望您可以改用 或类似的东西。