RxJs 中是否有 "async" 版本的过滤器运算符?
Is there an "async" version of filter operator in RxJs?
我需要通过针对某些 Web 服务检查条目来过滤 observable 发出的条目。普通的 observable.filter 运算符在这里不适用,因为它期望谓词函数同步 return 判断,但在这种情况下,只能异步检索判断。
我可以通过以下代码进行转换,但我想知道是否有更好的运算符可以用于这种情况。
someObservable.flatmap(function(entry) {
return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
return {
verdict: verdict,
entry: entry
};
});
}).filter(function(obj) {
return obj.verdict === true;
}).map(function(obj) {
return obj.entry;
});
我不知道。你可以自己动手。我还没有对此进行测试,但这是一个想法:
Observable.prototype.flatFilter = function(predicate) {
var self = this;
return Observable.create(function(obs) {
var disposable = new CompositeDisposable();
disposable.add(self.subscribe(function(x) {
disposable.add(predicate(x).subscribe(function(result) {
if(result) {
obs.onNext(x);
}
}, obs.onError.bind(obs)));
}, obs.onError.bind(obs), obs.onCompleted.bind(obs)));
return disposable;
});
};
你可以这样使用它:
someStream.flatFilter(function(x) {
return Rx.DOM.get('/isValid?check=' + x);
}).subscribe(function(x) {
console.log(x);
});
以下是您如何使用现有运算符实现此类运算符。您需要考虑一个问题。因为您的过滤操作是异步的,所以新项目到达的速度可能比您的过滤操作处理它们的速度更快。在这种情况下应该怎么办?您想要按顺序 运行 过滤器并保证您的项目顺序得到维护吗?您想要 运行 并行过滤并接受您的项目可能以不同的顺序出现吗?
这是运算符的两个版本
// runs the filters in parallel (order not guaranteed)
// predicate should return an Observable
Rx.Observable.prototype.flatFilter = function (predicate) {
return this.flatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
// runs the filters sequentially (order preserved)
// predicate should return an Observable
Rx.Observable.prototype.concatFilter = function (predicate) {
return this.concatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
用法:
var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc);
someObservable.concatFilter(predicate).subscribe(...);
RxJS 6+ 更新
从 RxJS 6.0 版开始,我们使用管道运算符代替可观察原型方法链。
所以我将此请求的原始代码更新为 RxJS 6 管道样式,并根据接受的答案中的信息进行了改进。
更新 2
我发布了这个包的 1.0.0 版:
https://www.npmjs.com/package/filter-async-rxjs-pipe/v/1.0.0
该软件包现在每周似乎有 90 到 300 次下载,所以它看起来非常稳定,我相信它会完成它的工作。
更新 1
我现在将这段代码重构为一个 npm 包。
https://www.npmjs.com/package/filter-async-rxjs-pipe
带有 concatMap 的串行变体已经可以正常工作,带有 flatMap 的并行变体目前似乎无法 运行 并行。
但是因为我需要 concatMap 版本,所以我目前拥有我所需要的一切。
如果有人知道如何正确编写并行版本,请在连接的 Git 存储库中添加一个问题。 :)
备注
因为我只需要传递一个 returns 一个 Promise 的谓词函数,
我将 Promise 到 Observable 的转换直接写到 filterAsync 方法中。如果您需要将 Observable 作为过滤器输入,请随意调整代码。
export function filterAsync<T>(predicate: (value: T, index: number) => Promise<boolean>): MonoTypeOperatorFunction<T> {
let count = 0;
return pipe(
// Convert the predicate Promise<boolean> to an observable (which resolves the promise,
// Then combine the boolean result of the promise with the input data to a container object
concatMap((data: T) => {
return from(predicate(data, count++))
.pipe(map((isValid) => ({filterResult: isValid, entry: data})));
}),
// Filter the container object synchronously for the value in each data container object
filter(data => data.filterResult === true),
// remove the data container object from the observable chain
map(data => data.entry)
);
}
这是一个包含完整 ts 文件代码的要点,包括导入:
https://gist.github.com/bjesuiter/288326f9822e0bc82389976f8da66dd8#file-filter-async-ts
这是我认为非常干净的 pipe-able 运算符解决方案。
function flatFilter<T>(
predicate: (value: T, index: number) => Observable<boolean>,
thisArg?: unknown): MonoTypeOperatorFunction<T> {
return flatMap(
(v, i) => predicate.call(thisArg, v, i).pipe(filter(b => b), mapTo(v)));
}
concatMap
或 switchMap
可以轻松交换(flatMap
是 mergeMap
)。
我需要通过针对某些 Web 服务检查条目来过滤 observable 发出的条目。普通的 observable.filter 运算符在这里不适用,因为它期望谓词函数同步 return 判断,但在这种情况下,只能异步检索判断。
我可以通过以下代码进行转换,但我想知道是否有更好的运算符可以用于这种情况。
someObservable.flatmap(function(entry) {
return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
return {
verdict: verdict,
entry: entry
};
});
}).filter(function(obj) {
return obj.verdict === true;
}).map(function(obj) {
return obj.entry;
});
我不知道。你可以自己动手。我还没有对此进行测试,但这是一个想法:
Observable.prototype.flatFilter = function(predicate) {
var self = this;
return Observable.create(function(obs) {
var disposable = new CompositeDisposable();
disposable.add(self.subscribe(function(x) {
disposable.add(predicate(x).subscribe(function(result) {
if(result) {
obs.onNext(x);
}
}, obs.onError.bind(obs)));
}, obs.onError.bind(obs), obs.onCompleted.bind(obs)));
return disposable;
});
};
你可以这样使用它:
someStream.flatFilter(function(x) {
return Rx.DOM.get('/isValid?check=' + x);
}).subscribe(function(x) {
console.log(x);
});
以下是您如何使用现有运算符实现此类运算符。您需要考虑一个问题。因为您的过滤操作是异步的,所以新项目到达的速度可能比您的过滤操作处理它们的速度更快。在这种情况下应该怎么办?您想要按顺序 运行 过滤器并保证您的项目顺序得到维护吗?您想要 运行 并行过滤并接受您的项目可能以不同的顺序出现吗?
这是运算符的两个版本
// runs the filters in parallel (order not guaranteed)
// predicate should return an Observable
Rx.Observable.prototype.flatFilter = function (predicate) {
return this.flatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
// runs the filters sequentially (order preserved)
// predicate should return an Observable
Rx.Observable.prototype.concatFilter = function (predicate) {
return this.concatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
用法:
var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc);
someObservable.concatFilter(predicate).subscribe(...);
RxJS 6+ 更新
从 RxJS 6.0 版开始,我们使用管道运算符代替可观察原型方法链。
所以我将此请求的原始代码更新为 RxJS 6 管道样式,并根据接受的答案中的信息进行了改进。
更新 2
我发布了这个包的 1.0.0 版:
https://www.npmjs.com/package/filter-async-rxjs-pipe/v/1.0.0
该软件包现在每周似乎有 90 到 300 次下载,所以它看起来非常稳定,我相信它会完成它的工作。
更新 1
我现在将这段代码重构为一个 npm 包。
https://www.npmjs.com/package/filter-async-rxjs-pipe
带有 concatMap 的串行变体已经可以正常工作,带有 flatMap 的并行变体目前似乎无法 运行 并行。 但是因为我需要 concatMap 版本,所以我目前拥有我所需要的一切。 如果有人知道如何正确编写并行版本,请在连接的 Git 存储库中添加一个问题。 :)
备注
因为我只需要传递一个 returns 一个 Promise 的谓词函数,
我将 Promise 到 Observable 的转换直接写到 filterAsync 方法中。如果您需要将 Observable 作为过滤器输入,请随意调整代码。
export function filterAsync<T>(predicate: (value: T, index: number) => Promise<boolean>): MonoTypeOperatorFunction<T> {
let count = 0;
return pipe(
// Convert the predicate Promise<boolean> to an observable (which resolves the promise,
// Then combine the boolean result of the promise with the input data to a container object
concatMap((data: T) => {
return from(predicate(data, count++))
.pipe(map((isValid) => ({filterResult: isValid, entry: data})));
}),
// Filter the container object synchronously for the value in each data container object
filter(data => data.filterResult === true),
// remove the data container object from the observable chain
map(data => data.entry)
);
}
这是一个包含完整 ts 文件代码的要点,包括导入:
https://gist.github.com/bjesuiter/288326f9822e0bc82389976f8da66dd8#file-filter-async-ts
这是我认为非常干净的 pipe-able 运算符解决方案。
function flatFilter<T>(
predicate: (value: T, index: number) => Observable<boolean>,
thisArg?: unknown): MonoTypeOperatorFunction<T> {
return flatMap(
(v, i) => predicate.call(thisArg, v, i).pipe(filter(b => b), mapTo(v)));
}
concatMap
或 switchMap
可以轻松交换(flatMap
是 mergeMap
)。