当我在 couchbase 集合之间移动文档时,如何添加移动时间?

When i move document betwen couchbase collections, how do i add time of moving?

我收集了一些文件。有时我需要将某些文档移动到另一个集合(又名存档),我还需要添加 archiving time 到它。现在我用两个查询来做,但想减少到一个。

由于缺乏 NoSQL 经验,我很难做出任何假设,所以我寻求帮助。

query = f"""INSERT INTO bucket.scope.second_collection (KEY key_id, VALUE document)
            SELECT META(doc).id AS key_id, doc AS document
            FROM bucket.scope.first_collection AS doc
            WHERE id = {};"""
cluster.query(query, QueryOptions(positional_parameters=[]))

time = datetime.now(tz)
query = f"""UPDATE bucket.scope.second_collection
            SET date_of_moving = "{time}"
            WHERE id = "{}";"""
cluster.query(query, QueryOptions(positional_parameters=[]))

您可以使用 OBJECT_ADD() 向现有 JSON 对象添加其他字段。

我不是 Python 开发人员,所以如果我有语法错误请原谅,但这里有一个 OBJECT_ADD:

的例子
query = f"""INSERT INTO bucket.scope.second_collection (KEY key_id, VALUE document)
            SELECT META(doc).id AS key_id, OBJECT_ADD(doc, ""date_of_moving"", {time}) AS document
            FROM bucket.scope.first_collection AS doc
            WHERE id = {};"""

只是为了完整起见,假设您有很多数据 1B+ 文档,并且您没有(或想要)collection 上的索引,服务器替代方案中还有另一个“low-code” .

假设您有两个 collections b01._default._defaultb02._default._default 并且您需要将一些文档从 b01._default._default 移动到 b02._default._default 基于某些过滤器(当然还要添加时间戳)。

我们创建一个函数,比如 archive_and_enrich,将具有以下设置(请注意在可扩展的 [高级] 设置下是否可以增加工作人员的数量如果需要可以获得更多性能)

然后添加函数的 JavaScript 代码并保存函数。

function OnUpdate(doc, meta) {
    // apply some filtering or business logic to only move the required documents
    // here we just ensure the property "state" eixists and is equal to "ready_to_move"
    // the filtering can be any complex business logic you need.
    if (!(doc.state && doc.state === "ready_to_move")) return;
    
    // update the local copy we received via DCP with a date stampe or Date
    doc.archiving_time = Date.now();    // this is a # millis since epoch 
    // doc.archiving_time = new Date(); // this is a date string
    
    // optional log what is going on to the Eventing Function's application log
    log("key " + meta.id + " copied to b0, added property .archiving_time " + doc.archiving_time);
    
    
    try {
        // write to the destination collection via the alias dst_col
        dst_col[meta.id] = doc;
        try {
            // delete from source collection via the alias src_col
            delete src_col[meta.id];
        } catch (e2) {
            log("archived key " + meta.id + " but failed to remove it from the source, issue: " + e1);
        }
    } catch (e1) {
        log("failed to archive and remove key " + meta.id + "issue: " + e1);
    }
}

现在您可以部署您的功能以激活和丰富您的数据,例如在b01._default._default中,我制作了以下文档使用键“test:0000001”

{
  "state": "not_ready_to_move",
  "type": "test",
  "id": "0000001",
  "date": "Lorem ipsum dolor sit amet, consectetur adipiscing elit"
}

会被忽略,现在把这个文件的“state”改成“ready_to_move”(我用UI).

现在我们看到文档已从源 collection 复制到目标 collection,然后从源中删除。您在 b02._default._default 中剩下以下内容 - 使用相同的密钥:

{
  "state": "ready_to_move",
  "type": "test",
  "id": "0000001",
  "date": "Lorem ipsum dolor sit amet, consectetur adipiscing elit",
  "archiving_time": 1644953704961
}

此外,JavaScript 中的第一个 log(...) 语句将发出如下内容:

2022-02-15T11:35:04.961-08:00 [INFO] "key test:0000001 copied to b0, added property .archiving_time 1644953704961"

在实际生产中,您不会希望发出此消息 - 只有那些关于异常的 log(...) 语句 - 因此您会删除并注释掉第一个 log(...) 语句。

一个非常适中的集群 3 x r52xlarge 每秒可以轻松归档超过 50K 个文档。但是一个经过良好调整的大型 Couchbase 集群可以通过这种 lambda 风格每秒归档大约 1/2 百万个文档(但是你的 Eventing 函数中的工作人员需要很高 - 比如 24,你应该有很多核心)。

您可能还想查看一个工具包,以将信息从桶范式移动到 collections 范式 https://github.com/jon-strabala/cb-buckets-to-collections