使用 JSONCollection 而不是 BSONCollection 的事务

Transactions With JSONCollection instead of BSONCollection

我在通过 JSONCollection 进行交易时遇到问题,出现以下错误:

JsResultException(errors:List((,List(JsonValidationError(List(CommandError[code=14, errmsg=BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long', doc: {"operationTime":{"$time":1596894245,"$i":5,"$timestamp":{"t":1596894245,"i":5}},"ok":0,"errmsg":"BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long'","code":14,"codeName":"TypeMismatch","$clusterTime":{"clusterTime":{"$time":1596894245,"$i":5,"$timestamp":{"t":1596894245,"i":5}},"signature":{"hash":{"$binary":"0000000000000000000000000000000000000000","$type":"00"},"keyId":0}}}]),WrappedArray())))))

我试图将我的项目更改为 BSONCollection 但遇到了一些麻烦,也许有解决方案可以克服 JSONCollection 的上述错误。 测试更新方法时也发生异常,但检查 insertOneViaTransaction 和 setRuleAsInactiveViaTransaction 已成功完成 这是我的交易代码: 更新:

 def update(oldRule: ExistRuleDto): Future[UpdateResult] = {
    val transaction = (collection: JSONCollection) => for {
      newRule <- dao.insertOneViaTransaction(collection,oldRule.toUpdatedRule) // insert new with ref to old
      oldRule <- dao.setRuleAsInactiveViaTransaction(collection,oldRule.id)
    } yield UpdateResult(oldRule, newRule)

      makeTransaction[UpdateResult](transaction)
  }

制作交易:

def makeTransaction[Out](block: JSONCollection => Future[Out]): Future[Out] = for {
    dbWithSession <- dao.collection.db.startSession()
    dbWithTx <- dbWithSession.startTransaction(None)
    coll = dbWithTx.collection[JSONCollection](dao.collection.name)

    // Operations:
    res <- block(coll)

    _ <- dbWithTx.commitTransaction()
    _ <- dbWithSession.endSession()
  } yield res

insertOneViaTransaction:

  def insertOneViaTransaction(collection: JSONCollection, rule: Rule): Future[Rule] = {
    collection.insert.one(rule).map {
      case DefaultWriteResult(true, 1, _, _, _, _) => rule
      case err => throw GeneralDBError(s"$rule was not inserted, something went wrong: $err")
    }.recover {
        case WriteResult.Code(11000) => throw DuplicationError(s"$rule exist on DB")
        case err => throw GeneralDBError(err.getMessage)
      }
  }

setRuleAsInactiveViaTransaction:

 def setRuleAsInactiveViaTransaction(collection: JSONCollection, ruleId: BSONObjectID): Future[Rule] = {
    collection.findAndUpdate(
      Json.obj(s"${Rule.ID}" -> ruleId),
      Json.obj(
        "$set" -> Json.obj(s"${Rule.Metadata}.${Metadata.Active}" -> false),
        "$unset" -> Json.obj(s"${Rule.Metadata}.${Metadata.LastVersionExists}" -> "")),
    fetchNewObject = true, upsert = false, sort = None, fields = None, bypassDocumentValidation = false, writeConcern = WriteConcern.Acknowledged, maxTime = None, collation = None, arrayFilters = Nil
    ).map(el => el.result[Rule].getOrElse {
      val msg = s"Operation fail for updating ruleId = $ruleId"
      logger.error(msg)
      throw GeneralUpdateError(msg)
    })
  }

我正在使用以下依赖项: 播放:

    "com.typesafe.play" % "sbt-plugin" % "2.7.2

Reactivemongo:

    "org.reactivemongo" %% "play2-reactivemongo" % "0.18.8-play27"

解决。 (不适用于紧凑型) 序列化程序:

  implicit object JsValueHandler extends BSONHandler[BSONValue, JsValue] {
    implicit override def read(bson: BSONValue): JsValue = BSONFormats.toJSON(bson)
    implicit override def write(j: JsValue): BSONValue = BSONFormats.toBSON(j).get
  }

作为交易:

def asTransaction[Out](block: BSONCollection => Future[Out]): Future[Out] = {
    for {
      dbWithSession <- collection.db.startSession()
      dbWithTx <- dbWithSession.startTransaction(None)
      collectionWithTx = dbWithTx.collection[BSONCollection](collection.name)
      out <- block(collectionWithTx)
      _ <- dbWithTx.commitTransaction()
      _ <- dbWithSession.endSession()
    } yield out
  }.recover {
    case ex: Exception =>
      logger.warn(s"asTransaction failed with ex: ${ex.getMessage}, rollback to previous state...")
      throw GeneralDBErrorOnTx(ex.getMessage)
  }

交易示例:

 def `change visibility of ExistsRules and insert UpdateEvents`(oldRules: List[Rule], active: Boolean): Future[Unit] = {
    ruleDao.asTransaction { collectionTx =>
      for {
        // (1) - $active old Rules
        _ <- ruleDao.updateManyWithBsonCollection(
          collectionTx,
          filter = BSONDocument(s"${Rule.ID}" -> BSONDocument("$in" -> oldRules.map(_._id))),
          update = BSONDocument("$set" -> BSONDocument(s"${Rule.Metadata}.${Metadata.Active}" -> active)))

        // (2) - Sync Cache with Update Events
        _ <- eventsService.addEvents(oldRules.map(rule => RuleEvent(rule.metadata.cacheKey, Update)))

      } yield ()
    }
  }

尽情享受吧!