RxJs:在 flatMapLatest 完成后访问 flatMapLatest 之前的数据
RxJs: access data before flatMapLatest after flatMapLatest finished
场景:
- 用户使用组合成单个流的过滤器
- 当过滤器发生变化时,将触发后端事件以获取 "cheap" 数据
- 当 "cheap" 数据到达时,另一个具有相同参数的请求被触发到不同的端点,即 returns "expensive" 数据将用于丰富廉价数据。请求应延迟 1 秒,并且仅在用户未更改任何过滤器时才会触发(否则应等待 1 秒)
我正在努力解决 3) 没有中间变量的选项。
let filterStream = Rx.Observable
.combineLatest(
filterX,
filterY,
(filterX, filterY) => {
x: filterX,
y: filterY
}
)
.map((filters) => {
limit: 100,
s: filters.x.a,
f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()
let cheapDataStream = filterStream
.flatMapLatest((filterQuery) =>
Rx.Observable.fromPromise(cheapBackendApiCall(filterQuery)))
// render cheap results
cheapDataStream
.map(result => transformForDisplay(result))
.subscribe(result => {
//render
// how do i invoke expensiveApiCall() with `filterQuery` data here?
// with a delay, and only if filterQuery has not changed?
});
你在找debounce吗?
该运算符似乎完全符合您的描述。
我想出的解决方案,虽然不确定 do() 是否合适,而且便宜 + 昂贵的数据合并看起来不太反应。
Rx.Observable
.combineLatest(
filterX,
filterY,
(filterX, filterY) => {
x: filterX,
y: filterY
}
)
.map((filters) => {
limit: 100,
s: filters.x.a,
f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()
.flatMapLatest((filterQuery) =>
Rx
.Observable
.fromPromise(cheapBackendApiCall(filterQuery))
.map((results) => {
filterQuery: filterQuery,
results: results
})
)
.do((filtersAndResults) => {
// render filtersAndResults.results
})
.debounce(1500)
.flatMapLatest((filtersAndResults) => {
return Rx
.Observable
.fromPromise(expensiveBackendApiCall(filtersAndResults.filterQuery))
.map(results => {
expensiveData: results,
cheapData: filtersAndResults.results
})
})
.subscribe((result)=> {
// combine results.cheapData + results.expensiveData with simple .map and .find
// and render
})
您可以利用隐式转换来避免在任何地方显式使用 fromPromise
。然后你可以使用 concat 到 return 首先是便宜的数据,然后是延迟的昂贵+便宜的数据。通过将其嵌套在 flatMapLatest
中,如果新查询到达,流也将取消任何未决的 expensiveCalls
。
var filters = Rx.Observable
.combineLatest(
filterX,
filterY,
(filterX, filterY) => {
x: filterX,
y: filterY
}
)
.map((filters) => {
limit: 100,
s: filters.x.a,
f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()
.flatMapLatest(filters => {
//This kicks off immediately
var cheapPromise = cheapBackendApiCall(filters);
//This was added in the latest version 4.1, the function is only called once it is subscribed to,
//if you are using earlier you will need to wrap it in a defer instead.
var expensivePromiseFn = () => expensiveBackendApiCall(filters);
//For join implicitly calls `fromPromise` so you can pass the same
// sort of arguments.
var cheapAndExpensive = Rx.Observable.forkJoin(
cheapPromise,
expensivePromiseFn,
(cheap, expensive) => ({cheap, expensive}));
//First return the cheap, then wait 1500 millis before subscribing
//which will trigger the expensive operation and join it with the result of the cheap one
//The parent `flatMapLatest` guarantees that this cancels if a new event comes in
return Rx.Observable.concat(cheap, cheapAndExpensive.delaySubscription(1500));
})
.subscribe(x => /*Render results*/);
场景:
- 用户使用组合成单个流的过滤器
- 当过滤器发生变化时,将触发后端事件以获取 "cheap" 数据
- 当 "cheap" 数据到达时,另一个具有相同参数的请求被触发到不同的端点,即 returns "expensive" 数据将用于丰富廉价数据。请求应延迟 1 秒,并且仅在用户未更改任何过滤器时才会触发(否则应等待 1 秒)
我正在努力解决 3) 没有中间变量的选项。
let filterStream = Rx.Observable
.combineLatest(
filterX,
filterY,
(filterX, filterY) => {
x: filterX,
y: filterY
}
)
.map((filters) => {
limit: 100,
s: filters.x.a,
f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()
let cheapDataStream = filterStream
.flatMapLatest((filterQuery) =>
Rx.Observable.fromPromise(cheapBackendApiCall(filterQuery)))
// render cheap results
cheapDataStream
.map(result => transformForDisplay(result))
.subscribe(result => {
//render
// how do i invoke expensiveApiCall() with `filterQuery` data here?
// with a delay, and only if filterQuery has not changed?
});
你在找debounce吗?
该运算符似乎完全符合您的描述。
我想出的解决方案,虽然不确定 do() 是否合适,而且便宜 + 昂贵的数据合并看起来不太反应。
Rx.Observable
.combineLatest(
filterX,
filterY,
(filterX, filterY) => {
x: filterX,
y: filterY
}
)
.map((filters) => {
limit: 100,
s: filters.x.a,
f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()
.flatMapLatest((filterQuery) =>
Rx
.Observable
.fromPromise(cheapBackendApiCall(filterQuery))
.map((results) => {
filterQuery: filterQuery,
results: results
})
)
.do((filtersAndResults) => {
// render filtersAndResults.results
})
.debounce(1500)
.flatMapLatest((filtersAndResults) => {
return Rx
.Observable
.fromPromise(expensiveBackendApiCall(filtersAndResults.filterQuery))
.map(results => {
expensiveData: results,
cheapData: filtersAndResults.results
})
})
.subscribe((result)=> {
// combine results.cheapData + results.expensiveData with simple .map and .find
// and render
})
您可以利用隐式转换来避免在任何地方显式使用 fromPromise
。然后你可以使用 concat 到 return 首先是便宜的数据,然后是延迟的昂贵+便宜的数据。通过将其嵌套在 flatMapLatest
中,如果新查询到达,流也将取消任何未决的 expensiveCalls
。
var filters = Rx.Observable
.combineLatest(
filterX,
filterY,
(filterX, filterY) => {
x: filterX,
y: filterY
}
)
.map((filters) => {
limit: 100,
s: filters.x.a,
f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()
.flatMapLatest(filters => {
//This kicks off immediately
var cheapPromise = cheapBackendApiCall(filters);
//This was added in the latest version 4.1, the function is only called once it is subscribed to,
//if you are using earlier you will need to wrap it in a defer instead.
var expensivePromiseFn = () => expensiveBackendApiCall(filters);
//For join implicitly calls `fromPromise` so you can pass the same
// sort of arguments.
var cheapAndExpensive = Rx.Observable.forkJoin(
cheapPromise,
expensivePromiseFn,
(cheap, expensive) => ({cheap, expensive}));
//First return the cheap, then wait 1500 millis before subscribing
//which will trigger the expensive operation and join it with the result of the cheap one
//The parent `flatMapLatest` guarantees that this cancels if a new event comes in
return Rx.Observable.concat(cheap, cheapAndExpensive.delaySubscription(1500));
})
.subscribe(x => /*Render results*/);