当我在 couchbase 集合之间移动文档时,如何添加移动时间?
When i move document betwen couchbase collections, how do i add time of moving?
我收集了一些文件。有时我需要将某些文档移动到另一个集合(又名存档),我还需要添加 archiving time
到它。现在我用两个查询来做,但想减少到一个。
由于缺乏 NoSQL 经验,我很难做出任何假设,所以我寻求帮助。
query = f"""INSERT INTO bucket.scope.second_collection (KEY key_id, VALUE document)
SELECT META(doc).id AS key_id, doc AS document
FROM bucket.scope.first_collection AS doc
WHERE id = {};"""
cluster.query(query, QueryOptions(positional_parameters=[]))
time = datetime.now(tz)
query = f"""UPDATE bucket.scope.second_collection
SET date_of_moving = "{time}"
WHERE id = "{}";"""
cluster.query(query, QueryOptions(positional_parameters=[]))
您可以使用 OBJECT_ADD()
向现有 JSON 对象添加其他字段。
我不是 Python 开发人员,所以如果我有语法错误请原谅,但这里有一个 OBJECT_ADD
:
的例子
query = f"""INSERT INTO bucket.scope.second_collection (KEY key_id, VALUE document)
SELECT META(doc).id AS key_id, OBJECT_ADD(doc, ""date_of_moving"", {time}) AS document
FROM bucket.scope.first_collection AS doc
WHERE id = {};"""
只是为了完整起见,假设您有很多数据 1B+ 文档,并且您没有(或想要)collection 上的索引,服务器替代方案中还有另一个“low-code” .
假设您有两个 collections b01._default._default 和 b02._default._default 并且您需要将一些文档从 b01._default._default 移动到 b02._default._default 基于某些过滤器(当然还要添加时间戳)。
我们创建一个函数,比如 archive_and_enrich,将具有以下设置(请注意在可扩展的 [高级] 设置下是否可以增加工作人员的数量如果需要可以获得更多性能)
然后添加函数的 JavaScript 代码并保存函数。
function OnUpdate(doc, meta) {
// apply some filtering or business logic to only move the required documents
// here we just ensure the property "state" eixists and is equal to "ready_to_move"
// the filtering can be any complex business logic you need.
if (!(doc.state && doc.state === "ready_to_move")) return;
// update the local copy we received via DCP with a date stampe or Date
doc.archiving_time = Date.now(); // this is a # millis since epoch
// doc.archiving_time = new Date(); // this is a date string
// optional log what is going on to the Eventing Function's application log
log("key " + meta.id + " copied to b0, added property .archiving_time " + doc.archiving_time);
try {
// write to the destination collection via the alias dst_col
dst_col[meta.id] = doc;
try {
// delete from source collection via the alias src_col
delete src_col[meta.id];
} catch (e2) {
log("archived key " + meta.id + " but failed to remove it from the source, issue: " + e1);
}
} catch (e1) {
log("failed to archive and remove key " + meta.id + "issue: " + e1);
}
}
现在您可以部署您的功能以激活和丰富您的数据,例如在b01._default._default中,我制作了以下文档使用键“test:0000001”
{
"state": "not_ready_to_move",
"type": "test",
"id": "0000001",
"date": "Lorem ipsum dolor sit amet, consectetur adipiscing elit"
}
会被忽略,现在把这个文件的“state”改成“ready_to_move”(我用UI).
现在我们看到文档已从源 collection 复制到目标 collection,然后从源中删除。您在 b02._default._default 中剩下以下内容 - 使用相同的密钥:
{
"state": "ready_to_move",
"type": "test",
"id": "0000001",
"date": "Lorem ipsum dolor sit amet, consectetur adipiscing elit",
"archiving_time": 1644953704961
}
此外,JavaScript 中的第一个 log(...) 语句将发出如下内容:
2022-02-15T11:35:04.961-08:00 [INFO] "key test:0000001 copied to b0, added property .archiving_time 1644953704961"
在实际生产中,您不会希望发出此消息 - 只有那些关于异常的 log(...) 语句 - 因此您会删除并注释掉第一个 log(...) 语句。
一个非常适中的集群 3 x r52xlarge 每秒可以轻松归档超过 50K 个文档。但是一个经过良好调整的大型 Couchbase 集群可以通过这种 lambda 风格每秒归档大约 1/2 百万个文档(但是你的 Eventing 函数中的工作人员需要很高 - 比如 24,你应该有很多核心)。
您可能还想查看一个工具包,以将信息从桶范式移动到 collections 范式 https://github.com/jon-strabala/cb-buckets-to-collections
我收集了一些文件。有时我需要将某些文档移动到另一个集合(又名存档),我还需要添加 archiving time
到它。现在我用两个查询来做,但想减少到一个。
由于缺乏 NoSQL 经验,我很难做出任何假设,所以我寻求帮助。
query = f"""INSERT INTO bucket.scope.second_collection (KEY key_id, VALUE document)
SELECT META(doc).id AS key_id, doc AS document
FROM bucket.scope.first_collection AS doc
WHERE id = {};"""
cluster.query(query, QueryOptions(positional_parameters=[]))
time = datetime.now(tz)
query = f"""UPDATE bucket.scope.second_collection
SET date_of_moving = "{time}"
WHERE id = "{}";"""
cluster.query(query, QueryOptions(positional_parameters=[]))
您可以使用 OBJECT_ADD()
向现有 JSON 对象添加其他字段。
我不是 Python 开发人员,所以如果我有语法错误请原谅,但这里有一个 OBJECT_ADD
:
query = f"""INSERT INTO bucket.scope.second_collection (KEY key_id, VALUE document)
SELECT META(doc).id AS key_id, OBJECT_ADD(doc, ""date_of_moving"", {time}) AS document
FROM bucket.scope.first_collection AS doc
WHERE id = {};"""
只是为了完整起见,假设您有很多数据 1B+ 文档,并且您没有(或想要)collection 上的索引,服务器替代方案中还有另一个“low-code” .
假设您有两个 collections b01._default._default 和 b02._default._default 并且您需要将一些文档从 b01._default._default 移动到 b02._default._default 基于某些过滤器(当然还要添加时间戳)。
我们创建一个函数,比如 archive_and_enrich,将具有以下设置(请注意在可扩展的 [高级] 设置下是否可以增加工作人员的数量如果需要可以获得更多性能)
然后添加函数的 JavaScript 代码并保存函数。
function OnUpdate(doc, meta) {
// apply some filtering or business logic to only move the required documents
// here we just ensure the property "state" eixists and is equal to "ready_to_move"
// the filtering can be any complex business logic you need.
if (!(doc.state && doc.state === "ready_to_move")) return;
// update the local copy we received via DCP with a date stampe or Date
doc.archiving_time = Date.now(); // this is a # millis since epoch
// doc.archiving_time = new Date(); // this is a date string
// optional log what is going on to the Eventing Function's application log
log("key " + meta.id + " copied to b0, added property .archiving_time " + doc.archiving_time);
try {
// write to the destination collection via the alias dst_col
dst_col[meta.id] = doc;
try {
// delete from source collection via the alias src_col
delete src_col[meta.id];
} catch (e2) {
log("archived key " + meta.id + " but failed to remove it from the source, issue: " + e1);
}
} catch (e1) {
log("failed to archive and remove key " + meta.id + "issue: " + e1);
}
}
现在您可以部署您的功能以激活和丰富您的数据,例如在b01._default._default中,我制作了以下文档使用键“test:0000001”
{
"state": "not_ready_to_move",
"type": "test",
"id": "0000001",
"date": "Lorem ipsum dolor sit amet, consectetur adipiscing elit"
}
会被忽略,现在把这个文件的“state”改成“ready_to_move”(我用UI).
现在我们看到文档已从源 collection 复制到目标 collection,然后从源中删除。您在 b02._default._default 中剩下以下内容 - 使用相同的密钥:
{
"state": "ready_to_move",
"type": "test",
"id": "0000001",
"date": "Lorem ipsum dolor sit amet, consectetur adipiscing elit",
"archiving_time": 1644953704961
}
此外,JavaScript 中的第一个 log(...) 语句将发出如下内容:
2022-02-15T11:35:04.961-08:00 [INFO] "key test:0000001 copied to b0, added property .archiving_time 1644953704961"
在实际生产中,您不会希望发出此消息 - 只有那些关于异常的 log(...) 语句 - 因此您会删除并注释掉第一个 log(...) 语句。
一个非常适中的集群 3 x r52xlarge 每秒可以轻松归档超过 50K 个文档。但是一个经过良好调整的大型 Couchbase 集群可以通过这种 lambda 风格每秒归档大约 1/2 百万个文档(但是你的 Eventing 函数中的工作人员需要很高 - 比如 24,你应该有很多核心)。
您可能还想查看一个工具包,以将信息从桶范式移动到 collections 范式 https://github.com/jon-strabala/cb-buckets-to-collections