使用 XDCR 在多数据中心复制 Couchbase 数据时,如何避免通过 Kafka Connect 和 Couchbase Eventing 将重复数据发布到 Kafka

How to avoid publishing duplicate data to Kafka via Kafka Connect and Couchbase Eventing, when replicate Couchbase data on multi data center with XDCR

我的桶是:

另外,我有一个 kafka couchbase 连接器,可以将数据从 MyEventingBucket 推送到 kafka 主题。

当我们只有一个数据中心时,没有任何问题。现在,我们拥有三个数据中心。我们使用 XDCR 在数据中心之间复制我们的数据,并且我们以主动-主动方式工作。因此,写入请求可以来自任何数据中心。

当数据复制到其他数据中心时,事件服务在所有数据中心工作,相同的数据通过Kafka连接器在Kafka上推送三次(因为我们有三个数据中心)。

如何避免向Kafka推送重复数据?

Ps:当然,我们可以运行一个事件服务或者Kafka连接器只在一个数据中心。因此,我们可以在 Kafka 上发布一次数据。但这不是一个好的解决方案。因为当这个数据中心出现问题时,我们会受到影响。这是使用多数据中心的主要原因。

显然,在一个完美的世界中,XDCR 只会在复制的存储桶上与 Eventing 一起工作。

我整理了一个基于事件的解决方法来解决主动/主动 XDCR 配置中的问题 - 它有点复杂,所以我认为工作代码将是最好的。这是执行 Matthew Groves 提到的解决方案的一种方法。

文档被标记,并且您通过 XDCR“cluster_state”文档共享(请参阅代码中的注释)以协调哪个集群是“主要”集群,因为您只希望一个集群触发事件功能。

我将为 7.0.0 版提供事件函数“xcdr_supression_700”的代码,稍作改动后它也适用于 6.6.5。

请注意,较新的 Couchbase 版本具有更多功能 WRT Eventing 并允许简化 Eventing 功能,例如:

  • 6.6+ 中的高级桶访问器,特别是 couchbase.replace() 可以使用 CAS 并防止潜在的比赛(注意三项赛不允许 锁定)。
  • 计时器已得到改进并且可以在 6.6+ 中被覆盖,从而简化了确定计时器是否为孤立计时器所需的逻辑。
  • 7.X 中的常量别名绑定允许 JavaScript 集群之间的事件代码相同,仅更改每个集群的设置。

设置 XDCR 和事件

以下代码将成功抑制名为“common”的存储桶或 7 中的所有额外事件突变。0.X 具有 active/active XDCR 复制的“common._default._default”键空间.

该示例适用于两 (2) 个集群,但可以扩展。此代码特定于 7.0(如果需要,我可以提供 6.5.1 变体 - 请私信我)

PS :它唯一做的就是记录一条消息(在正在处理函数的集群中)。您可以只设置两个单节点集群,我将集群命名为“couch01”和“couch03”。非常容易设置和测试,以确保您的存储桶中的突变仅在两个具有 active/active XDCR

的集群中处理一次

事件函数是通用的 WRT JavaScript 但它确实需要在每个集群上使用不同的常量别名,请参阅 OnUpdate(doc,meta) 入口点下方的注释。

/*
PURPOSE suppress duplicate mutations by Eventing when we use an Active/Active XDCR setup

Make two clusters "couch01" and "couch03" each with bucket "common" (if 7.0.0 keyspace "common._default._default")
On cluster "couch01", setup XDCR replication of common from "couch01" =>  "couch03"
On cluster "couch03", setup XDCR replication of common from "couch03" =>  "couch01"
This is an active / active XDCR configuration.

We process all documents in "common" except those with "type": "cluster_state" the documents can contain anything

{
  "data": "...something..."
}

We add "owner": "cluster" to every document, in this sample I have two clusters "couch01" and "couch03"
We add "crc": "crc" to every document, in this sample I have two clusters "couch01" and "couch03"
If either the "owner" or "crc" property does not exist we will add the properties ourselves to the document

{
  "data": "...something...",
  "owner": "couch01",
  "crc": "a63a0af9428f6d2d"
}

A document must exist with KEY "cluster_state" when things are perfect it looks lke the following:

{
    "type": "cluster_state",
    "ci_offline": {"couch01": false, "couch03": false },
    "ci_backups": {"couch03": "couch01",  "couch01": "couch03" }
}

Note ci_offline is an indicator that the cluster is down, for example is a document has an "owner": "couch01"
and "ci_offline": {"couch01": true, "couch03": false } then the cluster "couch02" will take ownership and the
documents will be updated accordingly.  An external process (ping/verify CB is running, etc.) runs every minute
or so and then updates the "cluster_state" if a change in cluster state occurs, however prior to updating
ci_offline to "true" the eventing Function on that cluster should either be undeployed or paused.  In addition
re-enabeling the cluster setting the flag ci_offline to "false" must be done before the Function is resumed or
re-deployed.

The ci_backups tells which cluster is a backup for which cluster, pretty simple for two clusters.

If you have timers when the timer fires you MUST check if the doc.owner is correct if not ignore the timer, i.e.
do nothing.  In addition, when you "take ownership" you will need to create a new timer.  Finally, all timers should
have an id such that if we ping pong ci_offline that the timer will be overwritten, this implies 6.6.0+ else you
need do even to more work to suppress orphaned timers.

The 'near' identical Function will be deployed on both clusters "couch01" and "couch02" make sure you have
a constant binding for 7.0.0 THIS_CLUSTER "couch01" or THIS_CLUSTER "couch02", or for 6.6.0 uncomment the
appropriate var statement at the top of OnUpdate().  Next you should have a bucket binding of src_bkt to
keyspace "common._default._default" for 7.0.0 or to bucket "common" in 6.6.0 in mode read+write.
*/

function OnUpdate(doc, meta) {
    // ********************************
    // MUST MATCH THE CLUSTER AND ALSO THE DOC "cluster_state"
    // *********
    // var THIS_CLUSTER = "couch01"; // this could be a constant binding in 7.0.0, in 6.X we uncomment one of these to match he cluster name
    // var THIS_CLUSTER = "couch03"; // this could be a constant binding in 7.0.0, in 6.X we uncomment one of these to match he cluster name
    // ********************************

    if (doc.type === "cluster_state") return;

    var cs = src_bkt["cluster_state"];  // extra bucket op read the state of the clusters
    if (cs.ci_offline[THIS_CLUSTER] === true) return; // this cluster is marked offline do nothing.
    // ^^^^^^^^
    // IMPORTANT: when an external process marks the cs.ci_offline[THIS_CLUSTER] back to false (as
    // in this cluster becomes online) it is assumed that the Eventing function was undeployed
    // (or was paused) when it was set "true" and will be redeployed or resumed AFTER it is set "false".
    // This order of this procedure is very important else  mutations will be lost.

    var orig_owner = doc.owner;
    var fallback_cluster = cs.ci_backups[THIS_CLUSTER]; // this cluster is the fallback for the fallback_cluster

   /*
    if (!doc.crc && !doc.owner) {
        doc.owner = fallback_cluster;
        src_bkt[meta.id] = doc;
        return; // the fallback cluster NOT THIS CLUSTER is now the owner, the fallback
                // cluster will then add the crc property, as we just made a mutation in that
                // cluster via XDCR
    }
   */

    if (!doc.crc && !doc.owner) {
        doc.owner = THIS_CLUSTER;
        orig_owner = doc.owner;
        // use CAS to avoid a potential 'race' between clusters
        var result = couchbase.replace(src_bkt,meta,doc);
        if (result.success) {
            // log('success adv. replace: result',result);
        } else {
            // log('lost to other cluster failure adv. replace: id',meta.id,'result',result);
            // re-read
            doc = src_bkt[meta.id];
            orig_owner = doc.owner;
        }
    }

    // logic to take over a failed clusters data, requires updating "cluster_state"
    if (orig_owner !== THIS_CLUSTER) {
        if ( orig_owner === fallback_cluster && cs.ci_offline[fallback_cluster] === true) {
            doc.owner = THIS_CLUSTER; // Here update the doc's owner
            src_bkt[meta.id] = doc;   // This cluster now will now process this doc's mutations.
        } else {
            return; // this isn't the fallback cluster.
        }
    }

    var crc_changed = false;
    if (!doc.crc) {
        var cur_owner = doc.owner;
        delete doc.owner;
        doc.crc = crc64(doc);  // crc DOES NOT include doc.owner && doc.crc
        doc.owner = cur_owner;
        crc_changed = true;
    } else {
        var cur_owner = doc.owner;
        var cur_crc = doc.crc;
        delete doc.owner;
        delete doc.crc;
        doc.crc = crc64(doc); // crc DOES NOT include doc.owner && doc.crc
        doc.owner = cur_owner;
        if (cur_crc != doc.crc) {
            crc_changed = true;
        } else {
            return;
        }
    }

    if (crc_changed) {
        // update the data with the new crc, to suppress duplicate XDCR processing, and re-deploy form Everything
        // we could use CAS here but at this point only one cluster will update the doc, so we can not have races.
        src_bkt[meta.id] = doc;
    }

    // This is the action on a fresh unprocessed mutation, here it is just a log message.
    log("A. Doc created/updated", meta.id, 'THIS_CLUSTER', THIS_CLUSTER, 'offline', cs.ci_offline[THIS_CLUSTER],
                                  'orig_owner', orig_owner, 'owner', doc.owner, 'crc_changed', crc_changed,doc.crc);
}

确保在导入“xcdr_supression_700.json”或“xcdr_supression_660.json”之前有两个存储桶

第一个集群的 (cluster01) 设置注意常量别名,因为您需要确保将 THIS_CLUSTER 设置为“couch01”

第二个集群 (cluster03) 的设置注意常量别名,因为您需要确保 THIS_CLUSTER 设置为“couch03”

现在,如果您使用的是 运行ning 版本 6.6.5,则您没有常量别名绑定(在事件函数的 JavaScript 中充当全局变量),因此需要取消对适当变量示例的注释对于 cluster couch01。

function OnUpdate(doc, meta) {
    // ********************************
    // MUST MATCH THE CLUSTER AND ALSO THE DOC "cluster_state"
    // *********
    var THIS_CLUSTER = "couch01"; // this could be a constant binding in 7.0.0, in 6.X we uncomment one of these to match he cluster name
    // var THIS_CLUSTER = "couch03"; // this could be a constant binding in 7.0.0, in 6.X we uncomment one of these to match he cluster name
    // ********************************
    // ....  code removed (see prior code example) ....
} 

一些comments/details:

你可能想知道为什么我们需要使用CRC函数并将其存储在进行XDCR的文档中。

Eventing 中内置的 CRC 函数 crc64() 用于检测 non-change 或由于 XDCR 文档更新而可能发生的突变。使用 CRC 和属性“owner”和“crc”允许 a) 确定拥有的集群和 b) 抑制当突变是由于 XDCR 集群到基于“活动”集群的集群复制时的事件函数。

注意,当更新文档中的 CRC 作为定时器函数的一部分时,事件函数的 OnUpdate(doc,meta) 入口点将再次被触发。如果你有定时器,当定时器触发时你必须检查 doc.owner 是否正确,如果不是你忽略定时器,即什么都不做。此外,当您“取得所有权”时,您将需要创建一个新计时器。最后,所有计时器都应该有一个 id,这样如果我们 ping pong cluster_state.ci_offline 计时器将被覆盖,这意味着您必须使用 6.6.0+ 版本,否则您需要做甚至需要做更多的工作来确定计时器何时触发,该计时器是孤立的,然后抑制任何操作。 在较旧的 Couchbase 版本中要非常小心,因为在 6.5 中您不能通过其 ID 覆盖计时器并且所有计时器 ID 都应该是唯一的

事件函数对源存储桶所做的任何突变都会被该事件函数抑制或看不到,无论文档是否被主 JavaScript 代码突变为计时器回调。然而,这些突变将通过另一个集群中的 XCDR active/active 复制看到。

关于使用事件计时器注意评论,我在前面的段落中提到了覆盖和抑制,特别是如果你坚持使用 Couchbase-server 6.5,可以这么说。

关于责任更新[=123= 文档,设想这将是 Couchbase 运行 之外的周期性脚本,在 Linux cron 中进行“活动”测试并手动覆盖。在这里要小心,因为网络分区问题很容易导致“脑裂”。

关于 cluster_state 的评论,该文档受 XCDR 约束,它是一个持久性文档,active/active 复制使它看起来是一个单一的inter-cluster 文档。如果集群“关闭”,则在活动集群上对其进行更改将导致它在“关闭”集群恢复时进行复制。

Deploy/Undeploy 将通过 DCP 变异流重新处理所有当前文档(feed 边界 == Everything) - 或 - 仅处理项目或发生的变异在部署之后(feed boundary == From now)。因此,在第一种情况下您需要仔细编码,以防止对同一文档执行两次操作,而在第二种情况下您将错过突变。

最好将我们的事件函数设计为幂等的,如果使用相同的输入参数多次调用它不会产生额外的影响。这可以通过将状态存储在已处理的文档中来实现,这样您就永远不会在 re-deploy.

上重新处理它们

Pause/Resume 调用暂停将创建一个检查点并关闭事件处理。在 Resume 上,DCP 流将从检查点开始(对于每个 vBucket),您不会错过任何一个受 DCP 重复数据删除影响的突变。此外,所有在“暂停”期间触发的“活动”计时器将尽快触发(通常在接下来的 7 秒计时器扫描间隔内)。

最佳

乔恩·斯特拉巴拉 首席产品经理 - Couchbase