RxJs:通过 API 递归分页并从列表中查找值
RxJs: Paginate through API recursively and find value from list
我正在使用 rxjs v6.4.0。我正在尝试通过 API 搜索名称等于 "development" 的非常具体的频道进行分页。我正在使用 expand
递归调用 API 并获取新页面。最终结果为我提供了一个串联的频道列表。然后我过滤掉名称不等于 "development" 的所有频道。但是我收到一个错误:TypeError: You provided 'undefined' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
const Rx = require('rxjs')
const Rx2 = require('rxjs/operators')
const getChannel = (cursor) => {
return this.service.getData(`${url}?cursor=${cursor || ''}`)
.pipe(Rx2.map(resp => JSON.parse(resp.body)))
.pipe(Rx2.expand(body => { // recurse the api until no more cursors
return body.response_metadata &&
body.response_metadata.next_cursor ?
getChannel(body.response_metadata.next_cursor) : Rx.EMPTY
}))
.pipe(Rx2.pluck('channels'))
.pipe(Rx2.mergeAll()) // flattens array
.pipe(Rx2.filter(c => {
console.log('finding', c.name === 'development')
return c.name === 'development'
}))
}
find
回调应该 return 一个布尔值,而不是一个 Observable。例如
find(c => c.name === 'development')
更新
这是您的修改示例。我删除了生成器,因为它们比我们的案例需要的更复杂。
const { of, EMPTY, throwError } = rxjs;
const { filter, tap, expand, pluck, mergeAll } = rxjs.operators;
const DATA =
[ {channels: [{id: 123, name: 'test'}, {id:4, name: 'hello'}], cursor: 1}
, {channels:[{id: 1, name: 'world'}, {id: 2, name: 'knows'}], cursor: 2}
, {channels:[{id: 3, name: 'react'}, {id: 5, name: 'devcap'}], cursor: false}
];
function getChannel(){
return getBlock()
.pipe(
expand(x => x.cursor ? getBlock(x.cursor) : EMPTY),
pluck('channels'),
mergeAll(),
filter(c => c.name === 'devcap')
)
}
getChannel().subscribe({
next: console.log,
error: console.error
});
function getBlock(index = 0) {
if (index >= DATA.length){
throwError('Out of bounds');
}
return of(DATA[index]);
}
更新 2
由于仅通过 getChannel()
完成递归,您的解决方案无效。当您执行 expand
时——您 运行 另一个循环 getChannel()
。这意味着您 运行 pluck-mergeAll-filter
在每个递归获取的值上链接两次!采摘和奉承它两次给你 undefined
-- 因此错误。
在你的 playground 中——尝试分离出这段代码
let getValue = ()=>{
const next = gen.next();
if (!next || !next.value) { return EMPTY; }
return next.value;
}
并在扩展中使用它,如下所示:
let getChannel = () => {
return getValue()
.pipe(
expand(body => {
return body.cursor ? getValue() : EMPTY
}),
pluck('channels'),
mergeAll(),
filter(c => c.name === 'devcap'),
)
}
让我知道这是否是您正在寻找的功能https://codepen.io/jeremytru91/pen/wOQxbZ?editors=1111
const {
of,
EMPTY
} = rxjs;
const {
filter,
tap,
expand,
take,
pluck,
concatAll,
flatMap,
first
} = rxjs.operators;
function* apiResponses() {
yield of({channels: [{id: 123, name: 'test'}, {id:4, name: 'hello'}], cursor: 1});
yield of({channels:[{id: 3, name: 'react'}, {id: 5, name: 'devcap'}], cursor:3});
yield of({channels:[{id: 1, name: 'world'}, {id: 2, name: 'knows'}], cursor: 2});
yield of({channels:[{id: 4, name: 'react'}, {id: 6, name: 'devcap'}], cursor:4});
}
let gen = apiResponses();
function getChannel() {
return gen.next().value // simulate api call
.pipe(
expand(body => {
return body.cursor ? gen.next().value : EMPTY
}),
pluck('channels'),
flatMap(channels => {
const filtered = channels.filter(channel => channel.name === 'devcap')
if(filtered.length) {
return filtered;
}
return EMPTY;
}),
first()
);
}
getChannel().subscribe(data => {
console.log('DATA!! ', data)
}, e => {
console.log('ERROR', e)
throw e
})
我正在使用 rxjs v6.4.0。我正在尝试通过 API 搜索名称等于 "development" 的非常具体的频道进行分页。我正在使用 expand
递归调用 API 并获取新页面。最终结果为我提供了一个串联的频道列表。然后我过滤掉名称不等于 "development" 的所有频道。但是我收到一个错误:TypeError: You provided 'undefined' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
const Rx = require('rxjs')
const Rx2 = require('rxjs/operators')
const getChannel = (cursor) => {
return this.service.getData(`${url}?cursor=${cursor || ''}`)
.pipe(Rx2.map(resp => JSON.parse(resp.body)))
.pipe(Rx2.expand(body => { // recurse the api until no more cursors
return body.response_metadata &&
body.response_metadata.next_cursor ?
getChannel(body.response_metadata.next_cursor) : Rx.EMPTY
}))
.pipe(Rx2.pluck('channels'))
.pipe(Rx2.mergeAll()) // flattens array
.pipe(Rx2.filter(c => {
console.log('finding', c.name === 'development')
return c.name === 'development'
}))
}
find
回调应该 return 一个布尔值,而不是一个 Observable。例如
find(c => c.name === 'development')
更新
这是您的修改示例。我删除了生成器,因为它们比我们的案例需要的更复杂。
const { of, EMPTY, throwError } = rxjs;
const { filter, tap, expand, pluck, mergeAll } = rxjs.operators;
const DATA =
[ {channels: [{id: 123, name: 'test'}, {id:4, name: 'hello'}], cursor: 1}
, {channels:[{id: 1, name: 'world'}, {id: 2, name: 'knows'}], cursor: 2}
, {channels:[{id: 3, name: 'react'}, {id: 5, name: 'devcap'}], cursor: false}
];
function getChannel(){
return getBlock()
.pipe(
expand(x => x.cursor ? getBlock(x.cursor) : EMPTY),
pluck('channels'),
mergeAll(),
filter(c => c.name === 'devcap')
)
}
getChannel().subscribe({
next: console.log,
error: console.error
});
function getBlock(index = 0) {
if (index >= DATA.length){
throwError('Out of bounds');
}
return of(DATA[index]);
}
更新 2
由于仅通过 getChannel()
完成递归,您的解决方案无效。当您执行 expand
时——您 运行 另一个循环 getChannel()
。这意味着您 运行 pluck-mergeAll-filter
在每个递归获取的值上链接两次!采摘和奉承它两次给你 undefined
-- 因此错误。
在你的 playground 中——尝试分离出这段代码
let getValue = ()=>{
const next = gen.next();
if (!next || !next.value) { return EMPTY; }
return next.value;
}
并在扩展中使用它,如下所示:
let getChannel = () => {
return getValue()
.pipe(
expand(body => {
return body.cursor ? getValue() : EMPTY
}),
pluck('channels'),
mergeAll(),
filter(c => c.name === 'devcap'),
)
}
让我知道这是否是您正在寻找的功能https://codepen.io/jeremytru91/pen/wOQxbZ?editors=1111
const {
of,
EMPTY
} = rxjs;
const {
filter,
tap,
expand,
take,
pluck,
concatAll,
flatMap,
first
} = rxjs.operators;
function* apiResponses() {
yield of({channels: [{id: 123, name: 'test'}, {id:4, name: 'hello'}], cursor: 1});
yield of({channels:[{id: 3, name: 'react'}, {id: 5, name: 'devcap'}], cursor:3});
yield of({channels:[{id: 1, name: 'world'}, {id: 2, name: 'knows'}], cursor: 2});
yield of({channels:[{id: 4, name: 'react'}, {id: 6, name: 'devcap'}], cursor:4});
}
let gen = apiResponses();
function getChannel() {
return gen.next().value // simulate api call
.pipe(
expand(body => {
return body.cursor ? gen.next().value : EMPTY
}),
pluck('channels'),
flatMap(channels => {
const filtered = channels.filter(channel => channel.name === 'devcap')
if(filtered.length) {
return filtered;
}
return EMPTY;
}),
first()
);
}
getChannel().subscribe(data => {
console.log('DATA!! ', data)
}, e => {
console.log('ERROR', e)
throw e
})