在订阅之间创建阻塞行为
Create a blocking behavior between subscriptions
我使用 defer
.
从 promise 创建了一个 observable
let connect$ = defer(() => connectAsync());
这将在我每次订阅connect$
时执行connectAsync
函数。
但是我需要等到之前的订阅完成,直到新的订阅开始。
这是一些带有注释的代码,可以使我想要的行为更加清晰。
connect$.subscribe(); // --|>
connect$.subscribe(); // --|>
connect$.subscribe(); // --|>
有没有我可以用来实现行为的 rxjs 运算符或主题?
简短答案:
不,RxJS 库中没有这样的运算符
龙(呃)答案:
RxJS curries 运算符的方式的一个很酷的事情是创建你自己的运算符相当轻松,RxJS 会像处理任何其他运算符一样对待它。
例如:
function shareQueue<T>(): MonoTypeOperatorFunction<T>{
let buffer: Observable<T> = EMPTY;
return (s: Observable<T>) => defer(() => {
buffer = concat(
buffer.pipe(ignoreElements()),
s
).pipe(
share({
resetOnError: () => EMPTY;
resetOnComplete: () => EMPTY;
resetOnRefCountZero: () => EMPTY;
})
);
return buffer;
});
}
然后你可以像这样使用它来获得(我认为的)你想要的:
const connect$ = defer(connectAsync).pipe(
shareQueue()
);
connect$.subscribe();
connect$.subscribe();
connect$.subscribe();
我使用 defer
.
let connect$ = defer(() => connectAsync());
这将在我每次订阅connect$
时执行connectAsync
函数。
但是我需要等到之前的订阅完成,直到新的订阅开始。
这是一些带有注释的代码,可以使我想要的行为更加清晰。
connect$.subscribe(); // --|>
connect$.subscribe(); // --|>
connect$.subscribe(); // --|>
有没有我可以用来实现行为的 rxjs 运算符或主题?
简短答案:
不,RxJS 库中没有这样的运算符
龙(呃)答案:
RxJS curries 运算符的方式的一个很酷的事情是创建你自己的运算符相当轻松,RxJS 会像处理任何其他运算符一样对待它。
例如:
function shareQueue<T>(): MonoTypeOperatorFunction<T>{
let buffer: Observable<T> = EMPTY;
return (s: Observable<T>) => defer(() => {
buffer = concat(
buffer.pipe(ignoreElements()),
s
).pipe(
share({
resetOnError: () => EMPTY;
resetOnComplete: () => EMPTY;
resetOnRefCountZero: () => EMPTY;
})
);
return buffer;
});
}
然后你可以像这样使用它来获得(我认为的)你想要的:
const connect$ = defer(connectAsync).pipe(
shareQueue()
);
connect$.subscribe();
connect$.subscribe();
connect$.subscribe();