在订阅之间创建阻塞行为

Create a blocking behavior between subscriptions

我使用 defer.

从 promise 创建了一个 observable
let connect$ = defer(() => connectAsync());

这将在我每次订阅connect$时执行connectAsync函数。 但是我需要等到之前的订阅完成,直到新的订阅开始。

这是一些带有注释的代码,可以使我想要的行为更加清晰。

connect$.subscribe(); // --|>
connect$.subscribe(); //    --|>
connect$.subscribe(); //       --|>

有没有我可以用来实现行为的 rxjs 运算符或主题?

简短答案:

不,RxJS 库中没有这样的运算符

龙(呃)答案:

RxJS curries 运算符的方式的一个很酷的事情是创建你自己的运算符相当轻松,RxJS 会像处理任何其他运算符一样对待它。

例如:

function shareQueue<T>(): MonoTypeOperatorFunction<T>{
  let buffer: Observable<T> = EMPTY;
  return (s: Observable<T>) => defer(() => {
    buffer = concat(
      buffer.pipe(ignoreElements()),
      s
    ).pipe(
      share({
        resetOnError: () => EMPTY;
        resetOnComplete: () => EMPTY;
        resetOnRefCountZero: () => EMPTY;
      })
    );
    return buffer;
  });
}

然后你可以像这样使用它来获得(我认为的)你想要的:

const connect$ = defer(connectAsync).pipe(
  shareQueue()
);

connect$.subscribe();
connect$.subscribe();
connect$.subscribe();