无法在 couchbase 服务器社区版 3.0.1 上更新 6m+ 文档
Unable to update 6m+ documents on couchbase server community edition 3.0.1
我正在尝试更新 couchbase 服务器社区版 3.0.1 服务器集群中的 600 万多个文档。我正在使用最新的 java sdk 并尝试了各种方法,我可以从视图中读取一批文档,更新它们并将它们替换回存储桶。
在我看来,随着流程的进行,吞吐量变得太慢,甚至不到 300 op/s。我尝试使用多种方法来使用批量操作方法(使用 Observable)来加速它,但没有成功。我什至让进程 运行 几个小时后才看到超时异常。
我尝试的最后一个选项是从视图中将所有文档 ID 读入临时文件,以便我可以读回文件并更新记录。但是,在 3 小时后,从视图中仅读取了 170 万个 ID(仅约 157 items/sec!),数据库给出了超时异常。
请注意,couchbase 集群包含 3 个服务器 (Ubuntu 14.04),每个服务器具有 8 个内核,24GB RAM 和 1TB SSD,java 代码 运行 更新数据位于具有 4 个内核、16GB RAM 和 1TB SSD 的相同网络。并且此集群上没有其他负载 运行ning。
看来,从服务器端读取所有ID似乎是不可能的。我检查了网络吞吐量,数据库服务器仅以 1mbps 的速度提供数据。
下面是用于从视图中读取所有文档 ID 的示例代码:
final Bucket statsBucket = db.getStatsBucket();
int skipCount = 0;
int limitCount = 10000;
System.out.println("reading stats ids ...");
try (DataOutputStream out = new DataOutputStream(new FileOutputStream("rowIds.tmp")))
{
while (true)
{
ViewResult result = statsBucket.query(ViewQuery.from("Stats", "AllLogs").skip(skipCount).limit(limitCount).stale(Stale.TRUE));
Iterator<ViewRow> rows = result.iterator();
if (!rows.hasNext())
{
break;
}
while (rows.hasNext())
{
out.writeUTF(rows.next().id());
}
skipCount += limitCount;
System.out.println(skipCount);
}
}
即使使用批量操作 (Observable) 方法,我也试过了,但没有成功。还尝试将限制计数更改为 1000(不限制 java 应用程序在一段时间后变得疯狂,甚至 SSH 停止响应。
有办法吗?
我找到了解决方案。 ViewQuery.skip() 方法并不是真正的跳过,不应该用于分页。 skip() 方法将从视图的开头读取所有数据,并在读取记录数后才开始输出,就像链表一样。
解决方案是使用 startKey() 和 startKeyDocId()。进入这些方法的 ID 是您阅读的最后一个项目的 ID。从这里得到这个解决方案:http://tugdualgrall.blogspot.in/2013/10/pagination-with-couchbase.html
所以读取视图中所有项目的最终代码是:
final Bucket statsBucket = db.getStatsBucket();
int limitCount = 10000;
int skipCount = 0;
System.out.println("reading stats ids ...");
try (DataOutputStream out = new DataOutputStream(new FileOutputStream("rowIds.tmp")))
{
String lastKeyDocId = null;
while (true)
{
ViewResult result;
if (lastKeyDocId == null)
{
result = statsBucket.query(ViewQuery.from("Stats", "AllLogs").limit(limitCount).stale(Stale.FALSE));
}
else
{
result = statsBucket.query(ViewQuery.from("Stats", "AllLogs").limit(limitCount).stale(Stale.TRUE).startKey(lastKeyDocId).skip(1));
}
Iterator<ViewRow> rows = result.iterator();
if (!rows.hasNext())
{
break;
}
while (rows.hasNext())
{
lastKeyDocId = rows.next().id();
out.writeUTF(lastKeyDocId);
}
skipCount += limitCount;
System.out.println(skipCount);
}
}
我正在尝试更新 couchbase 服务器社区版 3.0.1 服务器集群中的 600 万多个文档。我正在使用最新的 java sdk 并尝试了各种方法,我可以从视图中读取一批文档,更新它们并将它们替换回存储桶。
在我看来,随着流程的进行,吞吐量变得太慢,甚至不到 300 op/s。我尝试使用多种方法来使用批量操作方法(使用 Observable)来加速它,但没有成功。我什至让进程 运行 几个小时后才看到超时异常。
我尝试的最后一个选项是从视图中将所有文档 ID 读入临时文件,以便我可以读回文件并更新记录。但是,在 3 小时后,从视图中仅读取了 170 万个 ID(仅约 157 items/sec!),数据库给出了超时异常。
请注意,couchbase 集群包含 3 个服务器 (Ubuntu 14.04),每个服务器具有 8 个内核,24GB RAM 和 1TB SSD,java 代码 运行 更新数据位于具有 4 个内核、16GB RAM 和 1TB SSD 的相同网络。并且此集群上没有其他负载 运行ning。
看来,从服务器端读取所有ID似乎是不可能的。我检查了网络吞吐量,数据库服务器仅以 1mbps 的速度提供数据。
下面是用于从视图中读取所有文档 ID 的示例代码:
final Bucket statsBucket = db.getStatsBucket();
int skipCount = 0;
int limitCount = 10000;
System.out.println("reading stats ids ...");
try (DataOutputStream out = new DataOutputStream(new FileOutputStream("rowIds.tmp")))
{
while (true)
{
ViewResult result = statsBucket.query(ViewQuery.from("Stats", "AllLogs").skip(skipCount).limit(limitCount).stale(Stale.TRUE));
Iterator<ViewRow> rows = result.iterator();
if (!rows.hasNext())
{
break;
}
while (rows.hasNext())
{
out.writeUTF(rows.next().id());
}
skipCount += limitCount;
System.out.println(skipCount);
}
}
即使使用批量操作 (Observable) 方法,我也试过了,但没有成功。还尝试将限制计数更改为 1000(不限制 java 应用程序在一段时间后变得疯狂,甚至 SSH 停止响应。
有办法吗?
我找到了解决方案。 ViewQuery.skip() 方法并不是真正的跳过,不应该用于分页。 skip() 方法将从视图的开头读取所有数据,并在读取记录数后才开始输出,就像链表一样。
解决方案是使用 startKey() 和 startKeyDocId()。进入这些方法的 ID 是您阅读的最后一个项目的 ID。从这里得到这个解决方案:http://tugdualgrall.blogspot.in/2013/10/pagination-with-couchbase.html
所以读取视图中所有项目的最终代码是:
final Bucket statsBucket = db.getStatsBucket();
int limitCount = 10000;
int skipCount = 0;
System.out.println("reading stats ids ...");
try (DataOutputStream out = new DataOutputStream(new FileOutputStream("rowIds.tmp")))
{
String lastKeyDocId = null;
while (true)
{
ViewResult result;
if (lastKeyDocId == null)
{
result = statsBucket.query(ViewQuery.from("Stats", "AllLogs").limit(limitCount).stale(Stale.FALSE));
}
else
{
result = statsBucket.query(ViewQuery.from("Stats", "AllLogs").limit(limitCount).stale(Stale.TRUE).startKey(lastKeyDocId).skip(1));
}
Iterator<ViewRow> rows = result.iterator();
if (!rows.hasNext())
{
break;
}
while (rows.hasNext())
{
lastKeyDocId = rows.next().id();
out.writeUTF(lastKeyDocId);
}
skipCount += limitCount;
System.out.println(skipCount);
}
}