如果最近添加了记录,则 PutIfExists 失败

PutIfExists fails if a record has been recently added

ScalarDB 中,向 Cassandra 添加 ACID 功能的库出现以下错误

2020-09-24 18:51:33,607 [WARN] from com.scalar.db.transaction.consensuscommit.CommitHandler in ScalaTest-run-running-AllRepositorySpecs - preparing records failed
com.scalar.db.exception.storage.NoMutationException: no mutation was applied.

我是运行一个测试用例,我在其中获取一条记录以检查它是否不存在,然后我添加该记录,然后我获取它以查看它是否已成功添加,然后我更新然后我再次获取它以查看值已更新。

"update an answer if the answer exists" in {
      beforeEach()
      embeddedCassandraManager.executeStatements(cqlStartupStatements)

      val cassandraConnectionService = CassandraConnectionManagementService()
      val (cassandraSession, cluster) = cassandraConnectionService.connectWithCassandra("cassandra://localhost:9042/codingjedi", "codingJediCluster")
      //TODOM - pick the database and keyspace names from config file.
      cassandraConnectionService.initKeySpace(cassandraSession.get, "codingjedi")
      val transactionService = cassandraConnectionService.connectWithCassandraWithTransactionSupport("localhost", "9042", "codingJediCluster" /*,dbUsername,dbPassword*/)

      val repository = new AnswersTransactionRepository("codingjedi", "answer_by_user_id_and_question_id")

      val answerKey = AnswerKeys(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.answer_id.get,
        repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.question_id,
        Some(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.answer_id.get))

      logger.trace(s"checking if answer already exists")
      val distributedTransactionBefore = transactionService.get.start()

      val resultBefore = repository.get(distributedTransactionBefore, answerKey) //answer should not exist
      distributedTransactionBefore.commit()

      resultBefore.isLeft mustBe true
      resultBefore.left.get.isInstanceOf[AnswerNotFoundException] mustBe true

      logger.trace(s"no answer found. adding answer")
      val distributedTransactionDuring = transactionService.get.start()

      repository.add(distributedTransactionDuring, repoTestEnv.answerTestEnv.answerOfAPracticeQuestion)//add answer
      distributedTransactionDuring.commit()
      logger.trace(s"answer added")

      val distributedTransactionAfter = transactionService.get.start()

      val result = repository.get(distributedTransactionAfter, answerKey) //now answer should exist
      distributedTransactionAfter.commit()

      result mustBe (Right(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion))
      logger.trace(s"got answer from repo ${result}")

      val updatedNotes = if(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.notes.isDefined)
        Some(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.notes.get+"updated") else Some("updated notes")
      val updatedAnswer = repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.copy(notes=updatedNotes) //updated answer
      logger.trace(s"old notes ${repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.notes} vs new notes ${updatedNotes}")
      logger.trace(s"updated answer ${updatedAnswer}")

      val distributedTransactionForUpdate = transactionService.get.start()
      val resultOfupdate = repository.update(distributedTransactionForUpdate,updatedAnswer) //update answer
      distributedTransactionForUpdate.commit() //fails here

      logger.trace(s"update done. getting answer again")

      val distributedTransactionAfterUpdate = transactionService.get.start()

      val resultAfterUpdate = repository.get(distributedTransactionAfterUpdate, answerKey)
      distributedTransactionForUpdate.commit()

      resultAfterUpdate mustBe (Right(updatedAnswer))
      logger.trace(s"got result after update ${resultAfterUpdate}")

      afterEach()
    }

update 方法以 putIfExists 条件调用 add

 def update(transaction:DistributedTransaction, answer:AnswerOfAPracticeQuestion) = {
    logger.trace(s"updating answer value ${answer}")
    //checktest-update an answer if the answer exists
    add(transaction,answer, new PutIfExists)
  }


def add(transaction:DistributedTransaction,answer:AnswerOfAPracticeQuestion,mutationCondition:MutationCondition = new PutIfNotExists()) = {
    logger.trace(s"adding answer ${answer} with mutation state ${mutationCondition}")
    val pAnswerKey = new Key(new TextValue("answered_by_user", answer.answeredBy.get.answerer_id.toString),
      new TextValue("question_id",answer.question_id.toString))

    //to check duplication, both partition and clustering keys need to be present
    //val cAnswerKey = new Key(new TextValue("answer_id",answer.answer_id.toString))

    //logger.trace(s"created keys. ${pAnswerKey}, ${cAnswerKey}")
    val imageData = answer.image.map(imageList=>imageList).getOrElse(List())
    logger.trace(s"will check in ${keyspaceName},${tablename}")
    val putAnswer: Put = new Put(pAnswerKey/*,cAnswerKey*/)
      .forNamespace(keyspaceName)
      .forTable(tablename)
      .withCondition(mutationCondition)
      .withValue(new TextValue("answer_id", answer.answer_id.get.toString))
      .withValue(new TextValue("image", convertImageToString(imageData)))
      .withValue(new TextValue("answer", convertAnswersFromModelToString(answer.answer)))
      .withValue(new BigIntValue("creation_year", answer.creationYear.getOrElse(0)))
      .withValue(new BigIntValue("creation_month", answer.creationMonth.getOrElse(0)))
      .withValue(new TextValue("notes", answer.notes.getOrElse("")))

    logger.trace(s"putting answer ${putAnswer}")
    //checktest-add answer to respository
    //checktest-not add answer to respository if duplicate
    transaction.put(putAnswer)
  }

即使 notes 字段在现有 answer 和更新后的 answer

之间发生了变化,为什么我仍然收到错误消息

错误跟踪是(注意它说 IF NOT EXISTS!)。不应该是IF EXISTS吗?还有一个痕迹 there was a hit in the statement cache for [INSERT INTO codingjedi.answer_by_user_id_and_question_id (answered_by_user,question_id,tx_id,tx_state,tx_prepared_at,answer_id,image,answer,creation_year,creation_month,notes,tx_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) IF NOT EXISTS;]. 这是否意味着之前的 put 仍在缓存中并且是导致冲突的原因?

2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.StatementHandler in ScalaTest-run-running-AllRepositorySpecs - query to prepare : [INSERT INTO codingjedi.answer_by_user_id_and_question_id (answered_by_user,question_id,tx_id,tx_state,tx_prepared_at,answer_id,image,answer,creation_year,creation_month,notes,tx_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) IF NOT EXISTS;].
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.StatementHandler in ScalaTest-run-running-AllRepositorySpecs - there was a hit in the statement cache for [INSERT INTO codingjedi.answer_by_user_id_and_question_id (answered_by_user,question_id,tx_id,tx_state,tx_prepared_at,answer_id,image,answer,creation_year,creation_month,notes,tx_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) IF NOT EXISTS;].
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[11111111-1111-1111-1111-111111111111] is bound to 0
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[11111111-1111-1111-1111-111111111111] is bound to 1
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[468492df-0960-4160-8391-27fe7fa626c5] is bound to 2
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 1 is bound to 3
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 1600969893592 is bound to 4
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[11111111-1111-1111-1111-111111111111] is bound to 5
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[{"image":["image1binarydata","image2binarydata"]}] is bound to 6
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[{"answer":[{"filename":"c.js","answer":"some answer"}]}] is bound to 7
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 2019 is bound to 8
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 12 is bound to 9
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[some notesupdated] is bound to 10
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 1 is bound to 11
2020-09-24 18:51:33,607 [WARN] from com.scalar.db.transaction.consensuscommit.CommitHandler in ScalaTest-run-running-AllRepositorySpecs - preparing records failed
com.scalar.db.exception.storage.NoMutationException: no mutation was applied.

更新 痕迹

对于第一个putputAnswer

putting answer Put{namespace=Optional[codingjedi], table=Optional[answer_by_user_id_and_question_id], partitionKey=Key{TextValue{name=answered_by_user, value=Optional[11111111-1111-1111-1111-111111111111]}, TextValue{name=question_id, value=Optional[11111111-1111-1111-1111-111111111111]}}, clusteringKey=Optional.empty, values={answer_id=TextValue{na
me=answer_id, value=Optional[11111111-1111-1111-1111-111111111111]}, image=TextValue{name=image, value=Optional[{"image":["image1binarydata","image2binarydata"]}]}, answer=TextValue{name=answer, value=Optional[{"answer":[{"filename":"c.j
s","answer":"some answer"}]}]}, creation_year=BigIntValue{name=creation_year, value=2019}, creation_month=BigIntValue{name=creation_month, value=12}, notes=TextValue{name=notes, value=Optional[some notes]}}, consistency=SEQUENTIAL, condi
tion=Optional[com.scalar.db.api.PutIfNotExists@21bf308]}

对于第二个putputAnswer

putting answer Put{namespace=Optional[codingjedi], table=Optional[answer_by_user_id_and_question_id], partitionKey=Key{TextValue{name=answered_by_user, value=Optional[11111111-1111-1111-1111-111111111111]}, TextValue{name=question_id, value=Optional[11111111-1111-1111-1111-111111111111]}}, clusteringKey=Optional.empty, values={answer_id=TextValue{name=answer_id, value=Optional[11111111-1111-1111-1111-111111111111]}, image=TextValue{name=image, value=Optional[{"image":["image1binarydata","image2binarydata"]}]}, answer=TextValue{name=answer, value=Optional[{"answer":[{"filename":"c.js","answer":"some answer"}]}]}, creation_year=BigIntValue{name=creation_year, value=2019}, creation_month=BigIntValue{name=creation_month, value=12}, notes=TextValue{name=notes, value=Optional[some notesupdated]}}, consistency=SEQUENTIAL, condition=Optional[com.scalar.db.api.PutIfExists@2e057637]}

notes 字段已从 notes=TextValue{name=notes, value=Optional[some notes]}}, 更改为 notes=TextValue{name=notes, value=Optional[some notesupdated]}}

执行第二个put时,可以看到使用的mutation条件是IfNotExists

2020-09-25 12:35:34,188 [DEBUG] from com.scalar.db.storage.cassandra.Cassandra in ScalaTest-run-running-AllRepositorySpecs - executing put operation with Put{namespace=Optional[codingjedi], table=Optional[answer_by_user_id_and_question_id], partitionKey=Key{TextValue{name=answered_by_user, value=Optional[11111111-1111-1111-1111-111111111111]}, TextValue{name=question_id, value=Optional[11111111-1111-1111-1111-111111111111]}}, clusteringKey=Optional.empty, values={tx_id=TextValue{name=tx_id, value=Optional[c6bc39e9-656a-440c-8f68-af6005f37f7c]}, tx_state=IntValue{name=tx_state, value=1}, tx_prepared_at=BigIntValue{name=tx_prepared_at, value=1601033734188}, answer_id=TextValue{name=answer_id, value=Optional[11111111-1111-1111-1111-111111111111]}, image=TextValue{name=image, value=Optional[{"image":["image1binarydata","image2binarydata"]}]}, answer=TextValue{name=answer, value=Optional[{"answer":[{"filename":"c.js","answer":"some answer"}]}]}, creation_year=BigIntValue{name=creation_year, value=2019}, creation_month=BigIntValue{name=creation_month, value=12}, notes=TextValue{name=notes, value=Optional[**some notesupdated**]}, tx_version=IntValue{name=tx_version, value=1}}, consistency=LINEARIZABLE, condition=Optional[com.scalar.db.api.**PutIfNotExists**@21bf308]} 

标量数据库不允许对现有记录进行盲写。更新前好像没有get。

我认为这个过程应该检查当前值并更新事务中的值。在此代码中,无法保证获取和更新之间的原子性。

在下面Yuji的评论下,我把update的方法改成在put前面加了一个get,现在运行成功了。

def update(transaction:DistributedTransaction, answer:AnswerOfAPracticeQuestion) = {
    logger.trace(s"updating answer value ${answer}")
    val key = AnswerKeys(answer.answeredBy.get.answerer_id,answer.question_id,answer.answer_id)
    val result = get(transaction,key)
    if(result.isLeft) throw result.left.get else add(transaction,answer, new PutIfExists())
  }

但是我不明白为什么 Scalardb

中的 updatedelete 之前需要 get

由于协议是基于Snapshot Isolation的,所以需要先获取快照中的一条记录来更新记录。