使用 Kafka Connect Elasticsearch Connector 的消息顺序

Message order with Kafka Connect Elasticsearch Connector

我们在强制使用 Kafka Connect Elasticsearch 连接器将来自 Kafka 主题的消息发送到 Elasticsearch 的顺序时遇到问题。在主题中,消息以正确的顺序和正确的偏移量排列,但如果快速连续创建了两条具有相同 ID 的消息,它们会间歇性地以错误的顺序发送到 Elasticsearch。这会导致 Elasticsearch 从倒数第二条消息中获取数据,而不是从最后一条消息中获取数据。如果我们在主题中的两条消息之间人为地添加一两秒的延迟,问题就会消失。

文档 here 指出:

Document-level update ordering is ensured by using the partition-level Kafka offset as the document version, and using version_mode=external.

但是我在任何地方都找不到关于此 version_mode 设置的任何文档,也找不到我们是否需要在某处设置自己的文档。

在 Kafka Connect 系统的日志文件中,我们可以看到两条消息(对于相同的 ID)以错误的顺序处理,相隔几毫秒。看起来这些是在不同的线程中处理的,这可能很重要。另请注意,此主题只有一个分区,因此所有消息都在同一分区中。

以下是日志片段,为清楚起见略作编辑。 Kafka 主题中的消息由 Debezium 填充,我认为这与问题无关,但恰好包含时间戳值。这表明消息的​​处理顺序错误(尽管它们在 Kafka 主题中的顺序正确,由 Debezium 填充):

[2019-01-17 09:10:05,671] DEBUG http-outgoing-1 >> "
{
  "op": "u",
  "before": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM BEFORE SECOND UPDATE >> ...
  },
  "after": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM AFTER SECOND UPDATE >> ...
  },
  "source": { ... },
  "ts_ms": 1547716205205
}
" (org.apache.http.wire)

...

[2019-01-17 09:10:05,696] DEBUG http-outgoing-2 >> "
{
  "op": "u",
  "before": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM BEFORE FIRST UPDATE >> ...
  },
  "after": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM AFTER FIRST UPDATE >> ...
  },
  "source": { ... },
  "ts_ms": 1547716204190
}
" (org.apache.http.wire)

有谁知道在向 Elasticsearch 发送消息时如何强制此连接器维护给定文档 ID 的消息顺序?

问题是我们的 Elasticsearch 连接器将 key.ignore 配置设置为 true

我们在连接器的 Github 源代码中发现了这一行(在 DataConverter.java 中):

final Long version = ignoreKey ? null : record.kafkaOffset();

这意味着,对于 key.ignore=true,正在生成并发送到 Elasticsearch 的索引操作实际上是 "versionless" ...基本上,Elasticsearch 收到的文档的最后一组数据将替换任何以前的数据,即使它是 "old data".

从日志文件来看,连接器似乎有多个消费者线程读取源主题,然后将转换后的消息传递给 Elasticsearch,但它们传递给 Elasticsearch 的顺序不一定与主题相同订单。

使用key.ignore=false,每个Elasticsearch消息现在都包含一个等于Kafka记录偏移量的版本值,如果Elasticsearch已经接收到稍后的数据,Elasticsearch拒绝更新文档的索引数据"version".

这不是 唯一 解决此问题的方法。我们仍然必须对来自 Kafka 主题的 Debezium 消息应用转换,以将密钥转换为 Elasticsearch 满意的纯文本格式:

"transforms": "ExtractKey",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractKey.field": "id"