Couchbase 服务器允许并发文档突变,即使设置了 CAS?

Couchbase server allowing concurrent document mutation, even with CAS set?

我有一个对象 SomeObject,它代表一个对象,作为文档存储在 Couchbase 中。 SomeObject 有一个包含 CAS 值的 cas 变量。

我有这样的代码:

/* Get two identical objects from Couchbase, they'll have identical CAS value */
SomeObject someObjectA = getSomeObjectFromCouchbase(sameId);
SomeObject someObjectB = getSomeObjectFromCouchbase(sameId);

/* Make arbitrary modifications to the objects */
someObjectA.getListInObject().add(arbitraryValue1);
someObjectB.getListInObject().add(arbitraryValue2);

/* Convert SomeObject objects to JsonDocument objects, ensuring the CAS value is set */
JsonDocument jsonDocA = JsonDocument.create(someObjectA.getId(), JsonObject.fromJson(mapper.writeValueAsString(someObjectA)), someObjectA.getCas());
JsonDocument jsonDocB = JsonDocument.create(someObjectB.getId(), JsonObject.fromJson(mapper.writeValueAsString(someObjectB)), someObjectB.getCas());

/* Perform upserts on both JsonDocument objects; expectation is the second one should fail with CASMismatchException because the CAS value should have changed after the first upsert */
couchbaseDao.getDatasource().getBucket().upsert(jsonDocA, writeTimeout, TimeUnit.MILLISECONDS);
couchbaseDao.getDatasource().getBucket().upsert(jsonDocB, writeTimeout, TimeUnit.MILLISECONDS);

尽管我预计第二个更新插入应该因 CASMismatchException 而失败,我试图通过将代码包装在 try/catch 块中来捕获它,但它并没有发生。两次 upserts 都成功,并且服务器确实在两次 upsert 之后更改了 CAS 值。就好像它在 upsert 时甚至不检查 CAS 值,只是盲目地接受任何东西然后更新 CAS 值。

最终结果是 Couchbase 文档中的列表仅包含 arbitraryValue2,而缺少 arbitraryValue1,而我希望它具有 arbitraryValue1 而不是 arbitraryValue2(因为第二个 upsert 应该抛出 CASMismatchException)。是我做错了什么,还是服务器出了问题,没有正确处理 CAS?

CAS 只是用在 replace 方法中:

    JsonDocument doc = userRepository.getCouchbaseOperations().getCouchbaseBucket().get("1");
    JsonDocument doc2 = userRepository.getCouchbaseOperations().getCouchbaseBucket().get("1");
    doc.content().put("username", "Michael");

    userRepository.getCouchbaseOperations().getCouchbaseBucket().replace(doc);

    doc2.content().put("username", "denis");
    userRepository.getCouchbaseOperations().getCouchbaseBucket().replace(doc2);

    User userResult2 = userRepository.findById("1").get();
    System.out.println(userResult2.getUsername());

如果您尝试执行上面的代码,您将遇到以下异常:

aused by: com.couchbase.client.java.error.CASMismatchException: null
at com.couchbase.client.java.bucket.api.Mutate.call(Mutate.java:333) ~[java-client-2.7.11.jar:na]
at com.couchbase.client.java.bucket.api.Mutate.call(Mutate.java:308) ~[java-client-2.7.11.jar:na]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69) ~[rxjava-1.3.8.jar:1.3.8]
at rx.observers.Subscribers.onNext(Subscribers.java:235) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.SingleProducer.request(SingleProducer.java:65) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber.setProducer(OnSubscribeTimeoutTimedWithFallback.java:155) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:103) ~[rxjava-1.3.8.jar:1.3.8]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:508) ~[core-io-1.7.11.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.access[=11=]0(AbstractGenericHandler.java:86) ~[core-io-1.7.11.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.call(AbstractGenericHandler.java:526) ~[core-io-1.7.11.jar:na]
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.3.8.jar:1.3.8]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.ReplaceResponse.class

对于这里的 GO 用户,我正在使用 upsertSpec 和 CAS 来找出文档中的任何并行突变。它对我来说似乎工作得很好。虽然不确定 java SDK,但理想情况下,无论语言如何,行为都应该相同。