RxJs:通过 API 递归分页并从列表中查找值

RxJs: Paginate through API recursively and find value from list

我正在使用 rxjs v6.4.0。我正在尝试通过 API 搜索名称等于 "development" 的非常具体的频道进行分页。我正在使用 expand 递归调用 API 并获取新页面。最终结果为我提供了一个串联的频道列表。然后我过滤掉名称不等于 "development" 的所有频道。但是我收到一个错误:TypeError: You provided 'undefined' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

const Rx = require('rxjs')
const Rx2 = require('rxjs/operators')

const getChannel = (cursor) => {
  return this.service.getData(`${url}?cursor=${cursor || ''}`)
      .pipe(Rx2.map(resp => JSON.parse(resp.body)))
      .pipe(Rx2.expand(body => { // recurse the api until no more cursors
      return body.response_metadata && 
        body.response_metadata.next_cursor ? 
        getChannel(body.response_metadata.next_cursor) : Rx.EMPTY
    }))
    .pipe(Rx2.pluck('channels'))
    .pipe(Rx2.mergeAll()) // flattens array
    .pipe(Rx2.filter(c => {
      console.log('finding', c.name === 'development')
      return c.name === 'development'
    }))
}

find 回调应该 return 一个布尔值,而不是一个 Observable。例如

find(c => c.name === 'development')

更新

这是您的修改示例。我删除了生成器,因为它们比我们的案例需要的更复杂。

const { of, EMPTY, throwError } = rxjs;
const { filter, tap, expand, pluck, mergeAll } = rxjs.operators;

const DATA = 
  [ {channels: [{id: 123, name: 'test'}, {id:4, name: 'hello'}], cursor: 1}
  , {channels:[{id: 1, name: 'world'}, {id: 2, name: 'knows'}], cursor: 2}
  , {channels:[{id: 3, name: 'react'}, {id: 5, name: 'devcap'}], cursor: false}
  ];

function getChannel(){
  return getBlock()
    .pipe(
        expand(x => x.cursor ? getBlock(x.cursor) : EMPTY),
        pluck('channels'),
        mergeAll(),
        filter(c => c.name === 'devcap')
    )
}

getChannel().subscribe({
  next: console.log,
  error: console.error
});


function getBlock(index = 0) {
  if (index >= DATA.length){
    throwError('Out of bounds');
  }

  return of(DATA[index]);
}

更新 2

由于仅通过 getChannel() 完成递归,您的解决方案无效。当您执行 expand 时——您 运行 另一个循环 getChannel()。这意味着您 运行 pluck-mergeAll-filter 在每个递归获取的值上链接两次!采摘和奉承它两次给你 undefined -- 因此错误。

在你的 playground 中——尝试分离出这段代码

let getValue = ()=>{
    const next = gen.next();
    if (!next || !next.value) { return EMPTY; }
    return next.value;
}

并在扩展中使用它,如下所示:

let getChannel = () => {
   return getValue()
      .pipe(
        expand(body => {
          return body.cursor ? getValue() : EMPTY
        }),
        pluck('channels'),
        mergeAll(),
        filter(c => c.name === 'devcap'),
      )
}

让我知道这是否是您正在寻找的功能https://codepen.io/jeremytru91/pen/wOQxbZ?editors=1111

const {
  of,
  EMPTY
} = rxjs;

const {
  filter,
  tap,
  expand,
  take,
  pluck,
  concatAll,
  flatMap,
  first
} = rxjs.operators;

function* apiResponses() {
  yield of({channels: [{id: 123, name: 'test'}, {id:4, name: 'hello'}], cursor: 1});
  yield of({channels:[{id: 3, name: 'react'}, {id: 5, name: 'devcap'}], cursor:3});
  yield of({channels:[{id: 1, name: 'world'}, {id: 2, name: 'knows'}], cursor: 2});
  yield of({channels:[{id: 4, name: 'react'}, {id: 6, name: 'devcap'}], cursor:4});
}

let gen = apiResponses();

function getChannel() {
  return gen.next().value // simulate api call
    .pipe(
      expand(body => {
        return body.cursor ? gen.next().value : EMPTY
      }),
      pluck('channels'),
      flatMap(channels => {
        const filtered = channels.filter(channel => channel.name === 'devcap')
        if(filtered.length) {
          return filtered;
        }
        return EMPTY;
      }),
    first()
    );
}


getChannel().subscribe(data => {
  console.log('DATA!! ', data)
  }, e => {
  console.log('ERROR', e)
  throw e
})