在 RxJS 中使用过滤运算符

Using Filter Operator in RxJS

更新:我更改了数据库的投票节点。我的结构似乎没有意义。

这是我的数据库结构:

-threadsMeta
    -posts
        -postId


-votes
    -threadId
        -postId
            -uid: "down" //or "up"

我认为下面代码中的注释描述了预期行为与实际行为。

getMyUpVotes(threadId: string, uid: string): Observable<any> {
    //Get all the postId's for this thread
    let myUpVotes = this.af.database.list(`threadsMeta/${threadId}/posts`)
    .map(posts => {
        //Put each postId into a Firebase query path along with the uid from the method's params
        posts.forEach(post => {
            this.af.database.object(`votes/${threadId}/${post.$key}/upVotes/${uid}`)
            //Emit only upvotes from this user on this post
            .filter(data => data.$value === true)
        })
    })
    myUpVotes.subscribe(data => console.log(data)) //Undefined
    return myUpVotes
}

我想你想要:

let myUpVotes = this.af.database.list(`threadsMeta/${threadId}/posts`)
    .flatMap(posts => 
          Observable.from(posts)
              .flatMap(post=>this.af.database.object(
                   `votes/${threadId}/${post.$key}/upVotes/${uid}`));
myUpVotes = myUpVotes.flatMap(t=>t.filter(x=>x.uid === true));

以下方法将 return 给定线程 + 给定用户点赞的帖子数组:

getMyUpVotes(threadId: string, uid: string): Observable<any> {
  return this.af.database.list(`threadsMeta/${threadId}/posts`).take(1)
    // Flatten the posts array to emit each post individually.
    .mergeMap(val => val)
    // Fetch the upvotes for the current user on the current post.
    .mergeMap(post =>
      this.af.database.object(`votes/${threadId}/${post.$key}/upVotes/${uid}`).take(1)
        // Only keep actual upvotes
        .filter(upvote => upvote.$value === true)
        // Convert the upvote to a post, since this is the final value to emit.
        .map(upvote => post)
    )
    // Gather all posts in a single array.
    .toArray();
}

我添加了 .take(1) 以强制 Firebase observables 完成,以便可以使用 toArray() 收集最终结果。这也意味着一旦你获得了赞成票,你就不再关注未来的价值变化。让我知道这是否有问题。

重要。您应该从方法外部订阅可观察对象。

我创建了一个可运行版本来说明(请注意,我使用的是普通可观察对象,因为在此环境中 Angular 和 Firebase 都不可用):

const getPostsForThread = (threadId) => {
  return Rx.Observable.of([
    { key: 'jcormtp', title: 'Some post', threadId: threadId },
    { key: 'qapod', title: 'Another post', threadId: threadId },
    { key: 'bvxspo', title: 'Yet another post', threadId: threadId }
  ]);
}

const getUpvotesPerPostPerUser = (postId, uid) => {
  return Rx.Observable.from([
    { postId: 'jcormtp', uid: 'bar', value: true },
    { postId: 'qapod', uid: 'bar', value: false },
    { postId: 'bvxspo', uid: 'bar', value: true }
  ]).filter(uv => uv.postId == postId && uv.uid == uid);
}

const getMyUpVotes = (threadId: string, uid: string): Rx.Observable<any> => {
  return getPostsForThread(threadId)
    // Flatten the posts array to emit each post individually.
    .mergeMap(val => val)
    // Fetch the upvotes for the current user on the current post.
    .mergeMap(post =>
      getUpvotesPerPostPerUser(post.key, uid)
        // Only keep actual upvotes
        .filter(upvote => upvote.value === true)
        // Convert the upvote to a post, since this is the final value to emit
        .map(upvote => post)
    )
    // Gather all posts in a single array
    .toArray();
}

// NB. subscribe() outside of the method
getMyUpVotes('foo', 'bar')
  .subscribe(val => console.log(val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

所以我确实想出了一个非常适合我的特定情况的解决方案。我只是对 observables 了解不够,不知道这是否是最好的方法。无论如何,我用 posts 查询结束了 'joining' 查询。这适用于我的特定情况,因为我正在使用 ngFor 渲染 posts。我可以通过以下方式检查当前用户是否有 upvoted/downvoted 特定的 post:

<i [class.voted]="post.vote === 'down'" class="fa fa-caret-down" aria-hidden="true"></i>

这是我获取数据的方式:

getPosts(threadId: string, uid: string): Observable<any> {
    let result = this.af.database.list(`/threads/${threadId}/posts`)
    .switchMap(posts => {
        let joinedObservables: any[] = [];
        posts.forEach(post => {
            joinedObservables.push(this.af.database
                .object(`votes/${threadId}/${post.$key}/${uid}`)
                .do(value => {
                    post.vote = value.$value
                })
            )
        })
        return Observable.combineLatest(joinedObservables, () => posts);

    });
    return result
}

在这个实现中,我不断从投票中获取排放量,这很重要,因为用户投票必须反映在页面上,不仅是在加载时,而且如果他们改变了他们的投票。

我想知道是否有人预见到这方面的任何问题。我对这项技术一点也不擅长,我想知道我是否遗漏了什么。在选择这个作为答案之前,我会让其他回答过的人有机会表达任何反对意见并根据需要编辑他们自己的答案。