带有 couchbase 的 rxjava 是否为非批量操作提供价值
Does rxjava with couchbase offer value for non-bulk opertions
新的 Couchbase SDK 使批量操作更易于使用,使用 rx-java 的性能更高。但是使用 rx 对单个值进行操作有什么价值吗?
如果我们看一个简单的 CAS / 插入操作,即如果值存在则执行 cas 否则执行插入并且 return 文档值
final String id = "id";
final String modified = "modified";
final int numCasRetries = 3;
Observable
.defer(() -> bucket.async().get(id))
.flatMap(document -> {
try {
if (document == null) {
JsonObject content = JsonObject.create();
content.put(modified, new Date().getTime());
document = bucket.insert(JsonDocument.create(id, content));
} else {
document.content().put(modified, new Date().getTime());
document = bucket.replace(document);
}
return Observable.just(document);
} catch (CASMismatchException e) {
return Observable.error(e);
}
})
.retry((count, error) -> {
// Only retry on CASMismatchException
return ((error instanceof CASMismatchException)
&& (count < numCASRetries));
})
.onErrorResumeNext(error -> {
return Observable.error(new Exception(error));
})
.toBlocking()
.single();
因此 toBlocking
将阻塞调用线程,直到结果可用。并且一次只能从 Couchbase 写入和读取一个值。所以我不明白为什么甚至这段代码会比
final String id = "id";
final String modified = "modified";
final int numCasRetries = 3;
JsonDocument document = null;
for (int i = 1; i <= numCasRetries; i++) {
document = bucket.get(id);
try {
if (document == null) {
JsonObject content = JsonObject.create();
content.put(modified, new Date().getTime());
document = bucket.insert(JsonDocument.create(id, content));
} else {
document.content().put(modified, new Date().getTime());
document = bucket.replace(document);
}
return document;
} catch (CASMismatchException e) {
if (i == numCasRetries) {
throw e;
}
}
}
如果我认为在这种情况下 rx 方法的可读性较差。
对于最终需要阻止的单个文档的操作,我倾向于同意你的第二个例子更清楚。
RxJava 在您大量使用异步处理时大放异彩,尤其是当您需要高级错误处理、重试场景、异步流组合时...
上一代 Couchbase Java SDK (1.4.x) 只是 Future
,并没有提供我们发现的优雅、强大和表达能力在 RxJava.
新的 Couchbase SDK 使批量操作更易于使用,使用 rx-java 的性能更高。但是使用 rx 对单个值进行操作有什么价值吗?
如果我们看一个简单的 CAS / 插入操作,即如果值存在则执行 cas 否则执行插入并且 return 文档值
final String id = "id";
final String modified = "modified";
final int numCasRetries = 3;
Observable
.defer(() -> bucket.async().get(id))
.flatMap(document -> {
try {
if (document == null) {
JsonObject content = JsonObject.create();
content.put(modified, new Date().getTime());
document = bucket.insert(JsonDocument.create(id, content));
} else {
document.content().put(modified, new Date().getTime());
document = bucket.replace(document);
}
return Observable.just(document);
} catch (CASMismatchException e) {
return Observable.error(e);
}
})
.retry((count, error) -> {
// Only retry on CASMismatchException
return ((error instanceof CASMismatchException)
&& (count < numCASRetries));
})
.onErrorResumeNext(error -> {
return Observable.error(new Exception(error));
})
.toBlocking()
.single();
因此 toBlocking
将阻塞调用线程,直到结果可用。并且一次只能从 Couchbase 写入和读取一个值。所以我不明白为什么甚至这段代码会比
final String id = "id";
final String modified = "modified";
final int numCasRetries = 3;
JsonDocument document = null;
for (int i = 1; i <= numCasRetries; i++) {
document = bucket.get(id);
try {
if (document == null) {
JsonObject content = JsonObject.create();
content.put(modified, new Date().getTime());
document = bucket.insert(JsonDocument.create(id, content));
} else {
document.content().put(modified, new Date().getTime());
document = bucket.replace(document);
}
return document;
} catch (CASMismatchException e) {
if (i == numCasRetries) {
throw e;
}
}
}
如果我认为在这种情况下 rx 方法的可读性较差。
对于最终需要阻止的单个文档的操作,我倾向于同意你的第二个例子更清楚。
RxJava 在您大量使用异步处理时大放异彩,尤其是当您需要高级错误处理、重试场景、异步流组合时...
上一代 Couchbase Java SDK (1.4.x) 只是 Future
,并没有提供我们发现的优雅、强大和表达能力在 RxJava.