在 MongoDB 和 Socket.io 中使用 Change Stream 时,在 'change' 上被多次触发

While using Change Stream in MongoDB with Socket.io, the on 'change' is getting triggered multiple times

我一直在尝试捕获任何实时更新或插入 mongodb 并使用 socket.io 在下一页相应地更改信息。但是只要数据库中有任何更新,on 'change' 流就会被多次触发。我认为这也可能是套接字动作缓慢且延迟的原因。非常感谢任何形式的帮助。

const Order = new mongoose.model("Order", orderSchema);

io.on('connection', function(socket){
    console.log(socket.id);
    Order.watch([{ $match: {operationType: {$in: ['insert']}}}]).
    on('change', data => {
        console.log('Insert action triggered'); //getting triggered thrice
        console.log(new Date(), data.fullDocument);
        socket.emit("changes", data.fullDocument); 

    });
    Order.watch([{ $match: {operationType: {$in: ['update']}}}]).
    on('change', data => {
        console.log('Update action triggered'); //getting triggered thrice
        console.log(new Date(), data.updateDescription.updatedFields);
        socket.emit("customer", data);
    });

});

您需要将 mongodb changeStreams 放在 io.on('connection') 之外,并使用 io.emit() 而不是 socket.emit()

这里是示例代码:

const Order = new mongoose.model("Order", orderSchema);

io.on('connection', function(socket){
    console.log(socket.id);

    socket.on('disconnect', (reason) => {
        console.log(reason);
    });
});

Order.watch([{ $match: {operationType: {$in: ['insert']}}}]).
on('change', data => {
    console.log('Insert action triggered');
    console.log(new Date(), data.fullDocument);
    io.emit("changes", data.fullDocument); 

});
Order.watch([{ $match: {operationType: {$in: ['update']}}}]).
on('change', data => {
    console.log('Update action triggered');
    console.log(new Date(), data.updateDescription.updatedFields);
    io.emit("customer", data);
});

@swati sinha 的回答对我有用,但我也想使用 change stream 和 socket 在单独的函数中,所以这就是我做一点调整的原因

const Order = new mongoose.model("Order", orderSchema);

let OrderStream = Order.watch([{ $match: {operationType: {$in: ['insert']}}}])

io.on('connection', function(socket){
    console.log(socket.id);

    OrderStream.on('change',function(change){
        socket.emit('changes',change)
    })

    socket.on('disconnect', (reason) => {
        console.log(reason);
    });
});