如何改善或避免流星出版物中的查找/获取循环?

How to improve or avoid find / fetch cycle in meteor's publication?

TL;DR:

聊天是一个 collection。 ChatMess 是另一个消息引用聊天 _id 的消息。如何以尽可能少的计算从聊天列表中获取最后一条消息?在这里,循环中的查找/获取周期太重太长了。

我有这个出版物用来return一组游标给用户:

目前,逻辑是:

这是我的主要问题:

有没有更快的方法从我的每次聊天中获取最后一条消息 session?使用该算法,我很容易达到 8000 毫秒的响应时间,这是一种过于繁重的计算时间,因为大部分时间都花在查找/获取聊天消息的 _id(参见 Kadira 的链接屏幕)。

    Meteor.publish("publishNewChat", function() {
    this.unblock();

    // we get a list of chat _id
    let chatIdList = _get_all_the_user_chats_ids(this.userId);

    if (!chatList)
        return ;

    // get the chat sessions objects
    let chats_cursor = Modules.both.queryGet({
                    type        : 'chat',
                    method      : 'find',
                    query       : { _id: { $in: chatIdList } },
                    projection  : { sort: { _id: 1 }, limit : 1000 }
                });

    let array_of_fetched_chats = chats_cursor.fetch();
    let chat_ids = [];

    // and here we loop through the chat documents in order to get the last message that's been attached to each of them
    array_of_fetched_chats.forEach(function(e) {
        let lastMess = Modules.both.queryGet({
                            type        : 'chatMess',
                            method      : 'findOne',
                            query       : { chatId: e._id },
                            projection  : { sort: { date: -1 } }
                        });

        if (lastMess)
            chat_ids.push(lastMess._id);
    });

    return ([
        chats_cursor,
        Modules.both.queryGet({
            type        : 'chatMess',
            method      : 'find',
            query       : { _id: { $in: chat_ids } },
            projection  : { sort: { date: -1 }, limit: 1000 }
        })
    ]);
    });

最后,它还为我随后的所有 DDP 请求增加了延迟。我目前使用 this.unblock() 来避免这种情况,但我不想在这里使用它。

仅供参考,每次客户端更改其当前活动聊天时我都会更新另一个发布 session:在客户端上,路由到新聊天将其 _id 添加到更新我的 getChatMess 订阅的反应数组中为了在客户端上获取用户自连接以来访问过的每个聊天的消息。目标显然是让服务器免于发送用户一生中访问过的每个聊天 session 中的每条消息。

不幸的是,我缺乏在不破坏我所有聊天逻辑的情况下改进该算法的想法:S。你知道吗?你会怎么做?

谢谢。

编辑:这是来自 kadira 的屏幕,清楚地显示了问题:

您是否考虑过使用 reywood/publishComposite package? 使用此包,您可以使用相同的方法发布相关数据,而无需执行大量逻辑来发布正确的数据。

下面的代码应该可以帮助您入门:

Meteor.publishComposite("publishNewChat", function() {
return [{
    find:function(){
        return Users.find({ _id: this.userId },{fields:{"profile.chat":1}});
    },
    children:[{
        find:function(user){ //this function is passed each user returned from the cursor above.
            return UserChats.find({userId:user._id},{fields:{blah:1,blah:1}}); //find the user chats using whatever query 
        },
        children:[
            //if there are any children of user chats that you need to publish, do so here...
            {
                find:function(userchat){
                    return Chats.find({_id:userchat.chatId})
                },
                children:[
                    {
                        find:function(chat){
                            return ChatMess.find({chatId:chat._id},{ sort: { date: -1 } });
                        },
                        children:[
                            {
                                find:function(chatMess){
                                    var uids = _.without(chatMess.participants, this.userId);
                                    return Users.find({_id:{$in:uids}});
                                }
                            }
                        ]
                    }
                ]
            }
        ]
    },
    ]
}]

这将发布与每个父文档相关的所有文档的游标。它非常快,我在高流量和大数据集的生产平台上使用这个包没有问题。然后您可以在客户端上正常查询文档以获得您需要显示的文档。

类似于:

Users.findOne({_id:Meteor.userId()});
UserChats.find({userId:Meteor.userId()});
etc...

这是我开发的解决方案:

Meteor.publish("publishNewChat", function() {
this.unblock();

let user = Modules.both.queryGet({
                type        : 'users',
                method      : 'findOne',
                query       : { _id: this.userId },
                projection  : { fields: { "profile.chat": true } }
            });

let thisUserschats = tryReach(user, "profile", "chat").value;

if (!thisUserschats)
    return ;

thisUserschats = thisUserschats.map(function(e) { return (e.chatId); });

let chats = Modules.both.queryGet({
                type        : 'chat',
                method      : 'find',
                query       : { _id: { $in: thisUserschats } },
                projection  : { sort    : { _id: 1 },
                                limit   : 1000
                              }
            });

let chatArray = chats.fetch(),
    uids = cmid = [];

let messages_id_list = [],
    i = chatArray.length;

let _parallelQuery = index => {
    Meteor.setTimeout(function () {
        let tmp = Modules.both.queryGet({
                      type      : 'chatMess',
                      method    : 'find',
                      query     : { chatId: chatArray[index]._id },
                      projection: { limit: 1, sort: { date: -1 } }
                  });

        tmp.forEach(doc => {
            messages_id_list.push((doc && doc._id) ? doc._id : null);
        });
    }, 1);
}

while (--i >= 0)
    _parallelQuery(i);

let cursors = {
    chats           : chats,
    chatMessages    : null
}

let interval = Meteor.setInterval(function () {
    if (messages_id_list.length === chatArray.length)
    {
        Meteor.clearInterval(interval);

        cursors.chatMessages = Modules.both.queryGet({
                                    type        : 'chatMess',
                                    method      : 'find',
                                    query       : { _id: { $in: messages_id_list } },
                                    projection  : { sort: { date: -1 }, limit: 1000 }
                               });

        cursors.chats.observeChanges({
            // ...
        });

        cursors.chatMessages.observeChanges({
            // ...
        });

        self.ready();

      self.onStop(() => subHandle.stop(); );
    }
}, 10);

});

我使用带有 Meteor.setTimeout 的异步函数来并行化查询并保存一个引用聊天 _id 的索引以供查找。然后,当查询完成时,我将最后一条消息添加到数组中。使用 Meteor.setInterval,我检查数组长度以了解所有查询何时完成。然后,由于我不能再 return 游标了,我使用 Meteor 发布低级别 API 来处理文档的发布。

仅供参考:在第一次尝试中,我在我的 _parallelQueries 中使用了 'findOne',它将我的计算时间除以 2/3。但是后来,多亏了一位朋友,我尝试了 cursor.foreach() 函数,它让我再次将计算时间除以 2 !

在生产环境中,基准测试让我的响应时间从 7/8 秒减少到 1.6 秒的平均响应时间:)

希望这对你们有用! :)