序列化 Observables
Serialising Observables
我想序列化返回可观察对象的函数的执行。我已经设法做到了这一点,但这不是一个非常务实的解决方案。实现以下目标的反应性更强的方法是什么?
import Rx from 'rx'
export default class Executor {
constructor() {
this.queue = []
this.draining = false
}
run(fn) {
return Rx.Observable.create(o => {
this.queue.push(Rx.Observable.defer(() =>
fn()
.doOnNext((value) => o.onNext(value))
.doOnError((err) => o.onError(err))
.doOnCompleted(() => o.onCompleted())))
async () => {
if (this.draining) {
return
}
this.draining = true
while (this.queue.length > 0) {
try {
await this.queue.shift().toPromise()
} catch(err) {
// Do nothing...
}
}
this.draining = false
}()
})
}
}
不完全确定您要实现的目标,但听起来您在使用 flatMapWithMaxConcurrent 运算符。
import Rx from 'rx'
export default class Executor {
constructor() {
// Queue of: () => Observable
this.queue = new Rx.Subject();
this.queue
.flatMapWithMaxConcurrent(1,
fn => fn().catch(Rx.Observable.empty()) // Replace with proper error handling
)
.subscribe(result => console.log(result));
}
push(fn) {
this.queue.onNext(fn);
}
}
我想序列化返回可观察对象的函数的执行。我已经设法做到了这一点,但这不是一个非常务实的解决方案。实现以下目标的反应性更强的方法是什么?
import Rx from 'rx'
export default class Executor {
constructor() {
this.queue = []
this.draining = false
}
run(fn) {
return Rx.Observable.create(o => {
this.queue.push(Rx.Observable.defer(() =>
fn()
.doOnNext((value) => o.onNext(value))
.doOnError((err) => o.onError(err))
.doOnCompleted(() => o.onCompleted())))
async () => {
if (this.draining) {
return
}
this.draining = true
while (this.queue.length > 0) {
try {
await this.queue.shift().toPromise()
} catch(err) {
// Do nothing...
}
}
this.draining = false
}()
})
}
}
不完全确定您要实现的目标,但听起来您在使用 flatMapWithMaxConcurrent 运算符。
import Rx from 'rx'
export default class Executor {
constructor() {
// Queue of: () => Observable
this.queue = new Rx.Subject();
this.queue
.flatMapWithMaxConcurrent(1,
fn => fn().catch(Rx.Observable.empty()) // Replace with proper error handling
)
.subscribe(result => console.log(result));
}
push(fn) {
this.queue.onNext(fn);
}
}