使用 JSONCollection 而不是 BSONCollection 的事务
Transactions With JSONCollection instead of BSONCollection
我在通过 JSONCollection 进行交易时遇到问题,出现以下错误:
JsResultException(errors:List((,List(JsonValidationError(List(CommandError[code=14, errmsg=BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long', doc: {"operationTime":{"$time":1596894245,"$i":5,"$timestamp":{"t":1596894245,"i":5}},"ok":0,"errmsg":"BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long'","code":14,"codeName":"TypeMismatch","$clusterTime":{"clusterTime":{"$time":1596894245,"$i":5,"$timestamp":{"t":1596894245,"i":5}},"signature":{"hash":{"$binary":"0000000000000000000000000000000000000000","$type":"00"},"keyId":0}}}]),WrappedArray())))))
我试图将我的项目更改为 BSONCollection 但遇到了一些麻烦,也许有解决方案可以克服 JSONCollection 的上述错误。
测试更新方法时也发生异常,但检查 insertOneViaTransaction 和 setRuleAsInactiveViaTransaction 已成功完成
这是我的交易代码:
更新:
def update(oldRule: ExistRuleDto): Future[UpdateResult] = {
val transaction = (collection: JSONCollection) => for {
newRule <- dao.insertOneViaTransaction(collection,oldRule.toUpdatedRule) // insert new with ref to old
oldRule <- dao.setRuleAsInactiveViaTransaction(collection,oldRule.id)
} yield UpdateResult(oldRule, newRule)
makeTransaction[UpdateResult](transaction)
}
制作交易:
def makeTransaction[Out](block: JSONCollection => Future[Out]): Future[Out] = for {
dbWithSession <- dao.collection.db.startSession()
dbWithTx <- dbWithSession.startTransaction(None)
coll = dbWithTx.collection[JSONCollection](dao.collection.name)
// Operations:
res <- block(coll)
_ <- dbWithTx.commitTransaction()
_ <- dbWithSession.endSession()
} yield res
insertOneViaTransaction:
def insertOneViaTransaction(collection: JSONCollection, rule: Rule): Future[Rule] = {
collection.insert.one(rule).map {
case DefaultWriteResult(true, 1, _, _, _, _) => rule
case err => throw GeneralDBError(s"$rule was not inserted, something went wrong: $err")
}.recover {
case WriteResult.Code(11000) => throw DuplicationError(s"$rule exist on DB")
case err => throw GeneralDBError(err.getMessage)
}
}
setRuleAsInactiveViaTransaction:
def setRuleAsInactiveViaTransaction(collection: JSONCollection, ruleId: BSONObjectID): Future[Rule] = {
collection.findAndUpdate(
Json.obj(s"${Rule.ID}" -> ruleId),
Json.obj(
"$set" -> Json.obj(s"${Rule.Metadata}.${Metadata.Active}" -> false),
"$unset" -> Json.obj(s"${Rule.Metadata}.${Metadata.LastVersionExists}" -> "")),
fetchNewObject = true, upsert = false, sort = None, fields = None, bypassDocumentValidation = false, writeConcern = WriteConcern.Acknowledged, maxTime = None, collation = None, arrayFilters = Nil
).map(el => el.result[Rule].getOrElse {
val msg = s"Operation fail for updating ruleId = $ruleId"
logger.error(msg)
throw GeneralUpdateError(msg)
})
}
我正在使用以下依赖项:
播放:
"com.typesafe.play" % "sbt-plugin" % "2.7.2
Reactivemongo:
"org.reactivemongo" %% "play2-reactivemongo" % "0.18.8-play27"
解决。 (不适用于紧凑型)
序列化程序:
implicit object JsValueHandler extends BSONHandler[BSONValue, JsValue] {
implicit override def read(bson: BSONValue): JsValue = BSONFormats.toJSON(bson)
implicit override def write(j: JsValue): BSONValue = BSONFormats.toBSON(j).get
}
作为交易:
def asTransaction[Out](block: BSONCollection => Future[Out]): Future[Out] = {
for {
dbWithSession <- collection.db.startSession()
dbWithTx <- dbWithSession.startTransaction(None)
collectionWithTx = dbWithTx.collection[BSONCollection](collection.name)
out <- block(collectionWithTx)
_ <- dbWithTx.commitTransaction()
_ <- dbWithSession.endSession()
} yield out
}.recover {
case ex: Exception =>
logger.warn(s"asTransaction failed with ex: ${ex.getMessage}, rollback to previous state...")
throw GeneralDBErrorOnTx(ex.getMessage)
}
交易示例:
def `change visibility of ExistsRules and insert UpdateEvents`(oldRules: List[Rule], active: Boolean): Future[Unit] = {
ruleDao.asTransaction { collectionTx =>
for {
// (1) - $active old Rules
_ <- ruleDao.updateManyWithBsonCollection(
collectionTx,
filter = BSONDocument(s"${Rule.ID}" -> BSONDocument("$in" -> oldRules.map(_._id))),
update = BSONDocument("$set" -> BSONDocument(s"${Rule.Metadata}.${Metadata.Active}" -> active)))
// (2) - Sync Cache with Update Events
_ <- eventsService.addEvents(oldRules.map(rule => RuleEvent(rule.metadata.cacheKey, Update)))
} yield ()
}
}
尽情享受吧!
我在通过 JSONCollection 进行交易时遇到问题,出现以下错误:
JsResultException(errors:List((,List(JsonValidationError(List(CommandError[code=14, errmsg=BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long', doc: {"operationTime":{"$time":1596894245,"$i":5,"$timestamp":{"t":1596894245,"i":5}},"ok":0,"errmsg":"BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long'","code":14,"codeName":"TypeMismatch","$clusterTime":{"clusterTime":{"$time":1596894245,"$i":5,"$timestamp":{"t":1596894245,"i":5}},"signature":{"hash":{"$binary":"0000000000000000000000000000000000000000","$type":"00"},"keyId":0}}}]),WrappedArray())))))
我试图将我的项目更改为 BSONCollection 但遇到了一些麻烦,也许有解决方案可以克服 JSONCollection 的上述错误。 测试更新方法时也发生异常,但检查 insertOneViaTransaction 和 setRuleAsInactiveViaTransaction 已成功完成 这是我的交易代码: 更新:
def update(oldRule: ExistRuleDto): Future[UpdateResult] = {
val transaction = (collection: JSONCollection) => for {
newRule <- dao.insertOneViaTransaction(collection,oldRule.toUpdatedRule) // insert new with ref to old
oldRule <- dao.setRuleAsInactiveViaTransaction(collection,oldRule.id)
} yield UpdateResult(oldRule, newRule)
makeTransaction[UpdateResult](transaction)
}
制作交易:
def makeTransaction[Out](block: JSONCollection => Future[Out]): Future[Out] = for {
dbWithSession <- dao.collection.db.startSession()
dbWithTx <- dbWithSession.startTransaction(None)
coll = dbWithTx.collection[JSONCollection](dao.collection.name)
// Operations:
res <- block(coll)
_ <- dbWithTx.commitTransaction()
_ <- dbWithSession.endSession()
} yield res
insertOneViaTransaction:
def insertOneViaTransaction(collection: JSONCollection, rule: Rule): Future[Rule] = {
collection.insert.one(rule).map {
case DefaultWriteResult(true, 1, _, _, _, _) => rule
case err => throw GeneralDBError(s"$rule was not inserted, something went wrong: $err")
}.recover {
case WriteResult.Code(11000) => throw DuplicationError(s"$rule exist on DB")
case err => throw GeneralDBError(err.getMessage)
}
}
setRuleAsInactiveViaTransaction:
def setRuleAsInactiveViaTransaction(collection: JSONCollection, ruleId: BSONObjectID): Future[Rule] = {
collection.findAndUpdate(
Json.obj(s"${Rule.ID}" -> ruleId),
Json.obj(
"$set" -> Json.obj(s"${Rule.Metadata}.${Metadata.Active}" -> false),
"$unset" -> Json.obj(s"${Rule.Metadata}.${Metadata.LastVersionExists}" -> "")),
fetchNewObject = true, upsert = false, sort = None, fields = None, bypassDocumentValidation = false, writeConcern = WriteConcern.Acknowledged, maxTime = None, collation = None, arrayFilters = Nil
).map(el => el.result[Rule].getOrElse {
val msg = s"Operation fail for updating ruleId = $ruleId"
logger.error(msg)
throw GeneralUpdateError(msg)
})
}
我正在使用以下依赖项: 播放:
"com.typesafe.play" % "sbt-plugin" % "2.7.2
Reactivemongo:
"org.reactivemongo" %% "play2-reactivemongo" % "0.18.8-play27"
解决。 (不适用于紧凑型) 序列化程序:
implicit object JsValueHandler extends BSONHandler[BSONValue, JsValue] {
implicit override def read(bson: BSONValue): JsValue = BSONFormats.toJSON(bson)
implicit override def write(j: JsValue): BSONValue = BSONFormats.toBSON(j).get
}
作为交易:
def asTransaction[Out](block: BSONCollection => Future[Out]): Future[Out] = {
for {
dbWithSession <- collection.db.startSession()
dbWithTx <- dbWithSession.startTransaction(None)
collectionWithTx = dbWithTx.collection[BSONCollection](collection.name)
out <- block(collectionWithTx)
_ <- dbWithTx.commitTransaction()
_ <- dbWithSession.endSession()
} yield out
}.recover {
case ex: Exception =>
logger.warn(s"asTransaction failed with ex: ${ex.getMessage}, rollback to previous state...")
throw GeneralDBErrorOnTx(ex.getMessage)
}
交易示例:
def `change visibility of ExistsRules and insert UpdateEvents`(oldRules: List[Rule], active: Boolean): Future[Unit] = {
ruleDao.asTransaction { collectionTx =>
for {
// (1) - $active old Rules
_ <- ruleDao.updateManyWithBsonCollection(
collectionTx,
filter = BSONDocument(s"${Rule.ID}" -> BSONDocument("$in" -> oldRules.map(_._id))),
update = BSONDocument("$set" -> BSONDocument(s"${Rule.Metadata}.${Metadata.Active}" -> active)))
// (2) - Sync Cache with Update Events
_ <- eventsService.addEvents(oldRules.map(rule => RuleEvent(rule.metadata.cacheKey, Update)))
} yield ()
}
}
尽情享受吧!