防止 RxJS 中的异步管道运算符过早完成
Preventing premature completion of an async pipeable operator in RxJS
我正在使用 RxJS 6 创建 pipeable operators,并且不清楚在操作 异步.[=25 时如何 complete()
观察者=]
对于同步操作,逻辑很简单。在下面的示例中,来自源 Observable
的所有值都将传递给 observer.next()
,然后调用 observer.complete()
。
const syncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => observer.next(x),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
然而,对于异步操作,我有点不知所措。在下面的示例中,异步操作由对 setTimeout()
的调用表示。显然,observer.complete()
将在 任何值传递给 observer.next()
.
之前被调用
const asyncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => setTimeout(() => observer.next(x), 100),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
所以问题是:惯用的 RxJS 方法是什么才能使对 observer.complete()
的调用仅在所有值都异步传递给 observer.next()
之后才进行?我应该手动跟踪未决呼叫还是有更多 "reactive" 解决方案?
(请注意,上面的示例是我实际代码的简化,对 setTimeout()
的调用旨在表示 "any asynchronous operation"。我正在寻找一个在管道操作符中处理异步操作的一般方法,而不是关于如何处理 RxJS 中的延迟或超时的建议。)
仍然希望获得关于更多 reactive/idiomatic 实施的意见,但下面是我决定暂时采用的方法。
本质上,我只是为飞行中的操作使用了一个计数器 (pending
) 并使操作员仅在源可观察对象完成时才完成 (completed
) 并且没有待处理的操作 (!pending
)。
const asyncOp = () => (source) =>
new rxjs.Observable(observer => {
let pending = 0; // the number of in-flight operations
let completed = false; // whether or not the source observable completed
return source.subscribe({
next: (x) => {
pending++;
setTimeout(() => {
observer.next(x);
if (!--pending && completed) { // no ops pending and source completed
observer.complete();
}
}, 100);
},
error: (e) => observer.error(err),
complete: () => {
completed = true;
if (!pending) { // no ops pending
observer.complete();
}
}
})
});
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
我创建了这个 runnable StackBlitz demo 来展示我认为应该做的事情。
这里的想法是使用 toArray()
将源可观察到的所有值获取到一个数组中。 toArray()
后面的代码是单值(数组)。
注意:有很多方法(运算符)可以解决问题,这只是基于我从这个问题中理解的一个例子——这对 RxJS Observables 来说既是好事也是坏事。希望这可以帮助。 :-)
主要演示代码为:
// --- for each value, do the async service
of(...[1, 2, 3]).pipe(
// let each value be processed by both async service...
concatMap(no => myAsyncService$(no)),
concatMap(no => myAsyncService2$(no)),
// --- toArray() combines all the values (i.e. they completed)
toArray(),
// --- this will only be called once - with all completed values
// --- testing: try commenting the toArray() to see the values as individual "next" value
tap(val => {
// see the combined values
console.log(val)
})
).subscribe();
一种思路可能是重组您的 asyncOp
以使用其他运算符,例如 mergeMap
.
这是使用此方法重现您的示例的代码
const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));
这是否值得考虑取决于您的 asyncOp
所做的事情。如果它是异步的,因为它依赖于某些回调,例如 https 调用或从文件系统读取的情况,那么我认为这种方法可以工作,因为您可以将基于回调的函数转换为 Observable。
我正在使用 RxJS 6 创建 pipeable operators,并且不清楚在操作 异步.[=25 时如何 complete()
观察者=]
对于同步操作,逻辑很简单。在下面的示例中,来自源 Observable
的所有值都将传递给 observer.next()
,然后调用 observer.complete()
。
const syncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => observer.next(x),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
然而,对于异步操作,我有点不知所措。在下面的示例中,异步操作由对 setTimeout()
的调用表示。显然,observer.complete()
将在 任何值传递给 observer.next()
.
const asyncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => setTimeout(() => observer.next(x), 100),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
所以问题是:惯用的 RxJS 方法是什么才能使对 observer.complete()
的调用仅在所有值都异步传递给 observer.next()
之后才进行?我应该手动跟踪未决呼叫还是有更多 "reactive" 解决方案?
(请注意,上面的示例是我实际代码的简化,对 setTimeout()
的调用旨在表示 "any asynchronous operation"。我正在寻找一个在管道操作符中处理异步操作的一般方法,而不是关于如何处理 RxJS 中的延迟或超时的建议。)
仍然希望获得关于更多 reactive/idiomatic 实施的意见,但下面是我决定暂时采用的方法。
本质上,我只是为飞行中的操作使用了一个计数器 (pending
) 并使操作员仅在源可观察对象完成时才完成 (completed
) 并且没有待处理的操作 (!pending
)。
const asyncOp = () => (source) =>
new rxjs.Observable(observer => {
let pending = 0; // the number of in-flight operations
let completed = false; // whether or not the source observable completed
return source.subscribe({
next: (x) => {
pending++;
setTimeout(() => {
observer.next(x);
if (!--pending && completed) { // no ops pending and source completed
observer.complete();
}
}, 100);
},
error: (e) => observer.error(err),
complete: () => {
completed = true;
if (!pending) { // no ops pending
observer.complete();
}
}
})
});
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
我创建了这个 runnable StackBlitz demo 来展示我认为应该做的事情。
这里的想法是使用 toArray()
将源可观察到的所有值获取到一个数组中。 toArray()
后面的代码是单值(数组)。
注意:有很多方法(运算符)可以解决问题,这只是基于我从这个问题中理解的一个例子——这对 RxJS Observables 来说既是好事也是坏事。希望这可以帮助。 :-)
主要演示代码为:
// --- for each value, do the async service
of(...[1, 2, 3]).pipe(
// let each value be processed by both async service...
concatMap(no => myAsyncService$(no)),
concatMap(no => myAsyncService2$(no)),
// --- toArray() combines all the values (i.e. they completed)
toArray(),
// --- this will only be called once - with all completed values
// --- testing: try commenting the toArray() to see the values as individual "next" value
tap(val => {
// see the combined values
console.log(val)
})
).subscribe();
一种思路可能是重组您的 asyncOp
以使用其他运算符,例如 mergeMap
.
这是使用此方法重现您的示例的代码
const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));
这是否值得考虑取决于您的 asyncOp
所做的事情。如果它是异步的,因为它依赖于某些回调,例如 https 调用或从文件系统读取的情况,那么我认为这种方法可以工作,因为您可以将基于回调的函数转换为 Observable。