合并两个集合时间戳数据并显示实时结果

merging two collections time stamped data and showing real time result

InocmesExpenses集合在整个应用程序的许多地方单独使用。只有一页具有以下要求。我不相信 Mongodb 中没有解决方法,它确实有数百万用户:(

我在一个项目中使用 React-Meteor,有两个名为 IncomesExpenses 的集合。收入文件如下所示

{
    "_id" : "euAeJYArsAyFWLJs6",
    "account" : "3m5Zxsije9b6ZaNpu",
    "amount" : 3,
    "receivedAt" : ISODate("2017-07-07T06:21:00.000Z"),
    "type" : "project",
    "project" : {
        "_id" : "ec2WLt3GzHNwhK7oK",
        "name" : "test"
    },
    "owner" : "nM4ToQEbYBsC3NoQB",
    "createdAt" : ISODate("2017-07-07T06:21:37.293Z")
}

下面是费用文档的样子

{
    "_id" : "snWDusDbkLHHY2Yry",
    "account" : "3m5Zxsije9b6ZaNpu",
    "amount" : 4,
    "spentAt" : ISODate("2017-07-07T06:21:00.000Z"),
    "description" : "4",
    "category" : {
        "_id" : "vh593tw9dZgNdNwtr",
        "name" : "test",
        "icon" : "icon-icons_tution-fee"
    },
    "owner" : "nM4ToQEbYBsC3NoQB",
    "createdAt" : ISODate("2017-07-07T06:22:04.215Z")
}

现在我有一个名为交易的页面,我必须在其中显示基于时间的所有交易(收入和支出),因此我的交易发布代码如下所示

import { Meteor } from 'meteor/meteor';
import { Incomes } from '../../incomes/incomes.js';
import { Expenses } from '../../expences/expenses.js';
import { Counter } from 'meteor/natestrauser:publish-performant-counts';


let datefilter = (options, query) => {
    let dateQuery = {$gte: new Date(options.dateFilter.start), $lte: new Date(options.dateFilter.end)};
    let temp = {$or: [{receivedAt: dateQuery}, {spentAt: dateQuery}]};
    query.$and.push(temp);
};
Meteor.publish('transactions', function(options) {
    let query = {
        owner: this.userId,
        $and: []
    };

    if(options.accounts.length)
        query['account'] = {$in: options.accounts};

    options.dateFilter && datefilter(options, query);
    //here i also apply other filter based on category and project which does not matter so i removed

    if(!query.$and.length) delete query.$and;

    //computing 'Transactions' below
    return [
        Incomes.find(query, {
            sort: {
                receivedAt: -1
            },
            limit: options.limit,
            skip: options.skip
        }),
        Expenses.find(query, {
            sort: {
                spentAt: -1
            },
            limit: options.limit,
            skip: options.skip
        })
    ]
}); 

到这里一切正常,直到我必须在交易页面上实现分页,所以这里的数据必须按日期排序。这才是真正的问题。假设我的两个集合各有 10 条记录,我的模板页面必须包含前 10 个结果,所以我发送了 skip 0 和 limit 10,在 return 我得到了 10 条收入和 10 条支出记录,而在第二页上没有记录因为 skip 和 limit 都发送了 10。那么如何处理呢?我也使用了 counter Technic,但没有用。记住我的数据也是实时的。 任何帮助将不胜感激:)

This is a Temporary Solution for a quick join Issue, You should consider your data-set amount and passed all checks before apply Meantime I will change my schema as @NeilLunn suggested and plan migrations shortly.

为了满足显示要求的临时修复,我应用了 aggregate$out 的组合。现在代码如下所示

Meteor.publish('transaction', function(options){
    //here I perform many filter based on query and options
    //which deleted to shorten code on SO

    //call asynchronous method just to get result delay :)
    Meteor.call('copyTransactions', (err, res) => {
        //something do here
    });
    //note the we return results from **Transactions** which is comes as third collection
    return [
        Transactions.find(query, {
            sort: sortbyDate,
            skip: options.skip,
            limit: options.limit,
        }),
        new Counter('transactionsCount', Transactions.find(query, {
            sort: sortbyDate
        }))
    ];
});

现在发布 Transactions(一个单独的集合),我希望将其作为合并集合。请注意,此名称以 s 结尾,如 transactions 所以不要与上面的 (transaction)

混淆

publish the merge collection separately as "transactions"

Meteor.publish('transactions', function(limit){
    return Transactions.find(
        {
            owner: this.userId
        });
});

这是最重要的方法,它在发布中调用合并第三个集合中的两个集合,其中我首先 aggregated 所有结果都为 $out,然后将第二个集合附加为 batch 插入

import { Expenses } from '../../../api/expences/expenses'
import { Incomes } from '../../../api/incomes/incomes'
import { Transactions } from '../transactions'
import Future from 'fibers/future';
export const copyTransactions = new ValidatedMethod({
    name: 'copyTransactions',
    validate:null,
    run() {
        //we are using future here to make it asynchronous
        let fu = new Future();
        //here Expenses directly copied in new collection name Transactions
        // TODO: use $rename or $addField in aggregate instead of $project
        Expenses.aggregate([{
            $project: {
                account : "$account",
                amount : "$amount",
                transactionAt : "$spentAt",
                description : "$description",
                category : "$category",
                type: {
                    $literal: 'expense'
                },
                owner : "$owner",
                createdAt : "$createdAt"
            }
        }, {
            $out: "transactions"
        } ]);
        //now append Transactions collection with incomes with batch insert
        Incomes.aggregate([{
            $project: {
                account : "$account",
                amount : "$amount",
                transactionAt : "$receivedAt",
                type:{
                    $literal: 'income'
                },
                project : "$project",
                owner : "$owner",
                createdAt : "$createdAt"
            }
        }], function (err, result) {
            //if no doc found then just return
            if(!result.length){
                fu.return('completed')
            }
            else{
                Transactions.batchInsert(result, function(err, res){
                    fu.return('completed')
                })
            }

        });
        return fu.wait();
    }
});

If Second Collection aggregated too with $out then it will overwrite :(

@client 我只需要使用我的选项和查询订阅 'transaction' 并从 Transactions

获得实时合并结果