有没有我可以用来 "dispose" 管道中分配的资源的运算符?

Is there an operator than I can use to "dispose" a resource allocated in a pipeline?

假设我有一个 observable:

declare function allocateLeakyThing(input: string): { dispose: () => void; };

const somethingLeaky$ = input$.pipe(
  map(allocateLeakyThing)
);

其中 allocateLeakyThing() returns 必须通过调用其 dispose() 方法手动处理的资源,以防止内存泄漏。如果 input$ 发出一个新值,我应该处理之前分配的资源;如果这个 observable 被取消订阅,它也应该处理资源,如果它曾经被分配的话。

为了第一个目的,我可以写:

const somethingLeaky$ = input$.pipe(
  withLatestFrom(somethingLeaky$.pipe(startsWith(undefined))),
  tap(([input, somethingLeaky]) => somethingLeaky?.dispose()),
  map(([input])=>allocateLeakyThing(input))
);

第二个目的呢?有没有办法与这个可观察对象内联(例如不涉及使用另一个主题)?

您可以使用 finalize() 运算符,该运算符在处理链(取消订阅)时调用。

从 RxJS 7.3 开始。你也可以使用 tap({ finalize: () => {} }).

也许在您的情况下,您还可以使用 Observable 包装创建资源的逻辑(与 中的基本相同):

new Observable(observer => {
  // create resource
  // ...

  // Return teardown function that will be called when the Observable is unsubscribed.
  return () => resource.close();
});