try_join 使 mongodb 笔交易同时发送

try_join to make mongodb transactions sent at the same time

我是 Rust 的新手,我正在使用默认的 MongoDB 驱动程序 https://docs.rs/mongodb/2.0.0/mongodb/

我记得在使用 Node.js 编码时,有可能使用一些 Promise.all() 发送交易,以便同时执行所有交易以进行优化,如果有没有错误,提交事务。 (Node.js 此处示例:https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74

我现在正在尝试使用 try_join 在 Rust 中实现相同的逻辑!但我一直反对这个问题:

error: cannot borrow session as mutable more than once at a time; label: first mutable borrow occurs here

use mongodb::{bson::oid::ObjectId, Client, Database, options};
use async_graphql::{
    validators::{Email, StringMaxLength, StringMinLength},
    Context, ErrorExtensions, Object, Result,
};
use futures::try_join;
//use tokio::try_join; -> same thing

#[derive(Default)]
pub struct UserMutations;

#[Object]
impl UserMutations {


async fn user_followed<'ctx>(
        &self,
        ctx: &Context<'ctx>,
        other_user_id: ObjectId,
        current_user_id: ObjectId,
    ) -> Result<bool> {

    let mut session = Client::with_uri_str(dotenv!("URI"))
        .await
        .expect("DB not accessible!")
        .start_session(Some(session_options))
        .await?;

    session.start_transaction(Some(options::TransactionOptions::builder()
            .read_concern(Some(options::ReadConcern::majority()))
            .write_concern(Some(
                options::WriteConcern::builder()
                    .w(Some(options::Acknowledgment::Majority))
                    .w_timeout(Some(Duration::new(3, 0)))
                    .journal(Some(false))
                    .build(),
            ))
            .selection_criteria(Some(options::SelectionCriteria::ReadPreference(
                options::ReadPreference::Primary
            )))
            .max_commit_time(Some(Duration::new(3, 0)))
            .build())).await?; 
    
   
    let db = Client::with_uri_str(dotenv!("URI"))
        .await
        .expect("DB not accessible!").database("database").collection::<Document>("collection");

             try_join!(
                db.update_one_with_session(
                    doc! {
                        "_id": other_user_id
                    },
                    doc! {
                        "$inc": { "following_number": -1 }
                    },
                    None,
                    &mut session,
                ),
                db.update_one_with_session(
                    doc! {
                        "_id": current_user_id
                    },
                    doc! {
                        "$inc": { "followers_number": -1 }
                    },
                    None,
                    &mut session,
                )
            )?;
    
    Ok(true)
  }
}

849 | |                     &mut session,
    | |                     ------------ first mutable borrow occurs here
...   |
859 | |                     &mut session,
    | |                     ^^^^^^^^^^^^ second mutable borrow occurs here
860 | |                 )
861 | |             )?;
    | |_____________- first borrow later captured here by closure

有没有什么方法可以同步发送交易函数,以免在独立突变上浪费任何时间?有人有什么想法吗? 提前致谢!

此限制实际上是设计使然。在 MongoDB 中,客户端会话不能并发使用(参见 here and here),因此 Rust 驱动程序接受它们作为 &mut 以防止这种情况在编译时发生。 Node 示例只是偶然工作,绝对不是推荐或支持的行为。如果您想将这两项更新作为事务的一部分执行,则必须 运行 一个接一个地进行更新。如果您想同时 运行 它们,则需要在没有会话或事务的情况下执行它们。

附带说明一下,客户端会话只能与创建它的客户端一起使用。在提供的示例中,会话正在与另一个会话一起使用,这将导致错误。

谢谢 Patrick 和 Zeppi 的回答,我对这个话题做了更多的研究,也做了我自己的测试。那么,让我们开始吧。

首先,我的愿望是尽可能优化事务写入,因为我想要代码逻辑所需的完整回滚可能性。

如果你错过了我对帕特里克的评论,我会在这里重申它们以更好地反映我对此的思考方式:

I understand why this would be a limitation for multiple reads, but if all actions are on separate collections (or are independent atomic writes to multiple documents with different payloads) I don't see why it's impossible to retain casual consistency while executing them concurrently. This kind of transaction should never create race conditions / conflicts / weird lock behaviour, and in case of error the entire transaction is rolled back before being committed anyways.

Making an analogy with Git (which might be wrong), no merge conflicts are created when separate files / folders are updated. Sorry for being meticulous, this just sounds like a major speed boost opportunity.

但是,在查找之后我反对这个文档: https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#why-does-a-network-error-cause-the-serversession-to-be-discarded-from-the-pool

An otherwise unrelated operation that just happens to use that same server session will potentially block waiting for the previous operation to complete. For example, a transactional write will block a subsequent transactional write.

基本上,这意味着即使您将并发发送事务写入,您也不会获得太多效率,因为 MongoDB 本身就是一个阻塞。我决定检查这是否属实,并且由于 NodeJS 驱动程序设置允许并发发送事务(根据:https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74),我使用 NodeJS 进行了快速设置,指向免费层中由 Atlas 托管的同一数据库。

其次,统计数据和代码:这是我将用于测试的 NodeJS 变体(每个测试有 4 个事务写入)。我启用了 GraphQL 跟踪来对此进行基准测试,这是我的测试结果...

export const testMutFollowUser = async (_parent, _args, _context, _info) => {
  try {

    const { user, dbClient } = _context;
    isLoggedIn(user);
    const { _id } = _args;


    const session = dbClient.startSession();
    const db = dbClient.db("DB");

    await verifyObjectId().required().validateAsync(_id);

    //making sure asked user exists
    const otherUser = await db.collection("users").findOne(
      { _id: _id },
      {
        projection: { _id: 1 }
      });


    if (!otherUser)
      throw new Error("User was not found");
    

    const transactionResult = session.withTransaction(async () => {
        
        //-----using this part when doing concurrency test------

        await Promise.all([
          await createObjectIdLink({ db_name: 'links', from: user._id, to: _id, db }),
          await db.collection('users').updateOne(
            { _id: user._id },
            { $inc: { following_number: 1 } },

          ),
          await db.collection('users').updateOne(
            { _id },
            {
              $inc: { followers_number: 1, unread_notifications_number: 1 }
            },

          ),

          await createNotification({
            action: 'USER_FOLLOWED',
            to: _id
          }, _context)

        ]);
        //-----------end of concurrency part--------------------
        

        
        //------using this part when doing sync test--------

        //this as a helper for db.insertOne(...)
        const insertedId = await createObjectIdLink({ db_name: 'links', from: user._id, to: _id, db });


        const updDocMe = await db.collection('users').updateOne(
          { _id: user._id },
          { $inc: { following_number: 1 } },

        );

        const updDocOther = await db.collection('users').updateOne(
          { _id },
          {
            $inc: { followers_number: 1, unread_notifications_number: 1 }
          },

        );
        
        //this as another helper for db.insertOne(...)
        await createNotification({
          action: 'USER_FOLLOWED',
          to: _id
        }, _context);
        //-----------end of sync part---------------------------


        return true;


      }, transactionOptions);

      if (transactionResult) {
        console.log("The reservation was successfully created.");
      } else {
        console.log("The transaction was intentionally aborted.");
      }

      await session.endSession();

      return true;


    }

And related performance results:

format: 
Request/Mutation/Response = Total (all in ms)

1) For sync writes in the transaction:

4/91/32 = 127
4/77/30 = 111
7/71/7 = 85
6/66/8 = 80
2/74/9 = 85
4/70/8 = 82
4/70/11 = 85
--waiting more time (~10secs)
9/73/34 = 116

totals/8 = **96.375 ms in average**

//---------------------------------

2) For concurrent writes in transaction:

3/85/7 = 95
2/81/14 = 97
2/70/10 = 82
5/81/11 = 97
5/73/15 = 93
2/82/27 = 111
5/69/7 = 81
--waiting more time (~10secs)
6/80/32 = 118

totals/8 = ** 96.75 ms ms in average **

结论:两者的区别在误差范围内(但还是在同步端)。

我的假设是同步方式,你花时间等待数据库 request/response,而在并发方式中,你在等待 MongoDB 来订购请求,并且然后执行所有这些,在一天结束时将花费相同的时间。

因此,根据当前的 MongoDB 政策,我想,我的问题的答案将是“不需要并发,因为它无论如何都不会影响性能。”但是,如果 MongoDB 允许在未来版本中使用文档级别的锁定(至少对于 WiredTiger 引擎)而不是数据库级别的锁定来并行化事务中的写入,这将是令人难以置信的,因为它目前用于事务(因为你是等待整个写入完成直到下一个)。

如果我 missed/misinterpreted 有什么问题,请随时纠正我。谢谢!