配置使用 Kafka Connect Cassandra Source 时写入 Kafka Topic 的内容

Configuring what is written to Kafka Topic when using Kafka Connect Cassandra Source

我正在做一个 spike,我们希望将在 Cassandra table 中写入的数据发布到 Kafka 主题。我们正在考虑使用 Kafka Connect 和 Stream Reactor 连接器。

我正在使用 Kafka 0.10.0.1

我正在使用 DataMountaineer Stream Reactor 0.2.4

我将 Stream Reactor 的 jar 文件放入 Kafka libs 文件夹中,然后 运行 Kafka Connect 处于分布式模式

bin/connect-distributed.sh config/connect-distributed.properties

我按如下方式添加了 Cassandra Source 连接器:

curl -X POST -H "Content-Type: application/json" -d @config/connect-idoc-cassandra-source.json.txt localhost:8083/connectors

当我向 Cassandra 添加数据时 table 我看到使用 Kafka 命令行消费者将其添加到主题

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic idocs-topic --from-beginning

这是目前正在写入主题的示例:

{
"schema": {
    "type": "struct",
    "fields": [{
        "type": "string",
        "optional": true,
        "field": "idoc_id"
    }, {
        "type": "string",
        "optional": true,
        "field": "idoc_event_ts"
    }, {
        "type": "string",
        "optional": true,
        "field": "json_doc"
    }],
    "optional": false,
    "name": "idoc.idocs_events"
},
"payload": {
    "idoc_id": "dc4ab8a0-fdf8-11e6-8285-1bce55915fdd",
    "idoc_event_ts": "dc4ab8a1-fdf8-11e6-8285-1bce55915fdd",
    "json_doc": "{\"foo\":\"bar\"}"
}}

我想写的主题是 json_doc 列的值。

这是我在 Cassandra 源配置中的内容

{
"name": "cassandra-idocs",
"config": {
    "tasks.max": "1",
    "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
    "connect.cassandra.key.space": "idoc",
    "connect.cassandra.source.kcql": "INSERT INTO idocs-topic SELECT json_doc FROM idocs_events PK idoc_event_ts",
    "connect.cassandra.import.mode": "incremental",
    "connect.cassandra.contact.points": "localhost",
    "connect.cassandra.port": 9042,
    "connect.cassandra.import.poll.interval": 10000
}}

如何更改 Kafka Connect Cassandra Source 的配置方式,以便仅将 json_doc 的值写入主题,使其看起来像这样:

{"foo":"bar"}

Kassandra Connect Query Language 似乎是可行的方法,但它并不限制写入 KCQL 中指定列的内容。

更新

看到这个 并将 connect-distributed.properties 文件中的转换器从 JsonConverter 更改为 StringConverter

结果是现在写入主题:

Struct{idoc_id=74597cf0-fdf7-11e6-8285-1bce55915fdd,idoc_event_ts=74597cf1-fdf7-11e6-8285-1bce55915fdd,json_doc={"foo":"bar"}}

更新 2

connect-distributed.properties 文件中的转换器改回 JsonConverter。然后还禁用了架构。

key.converter.schemas.enable=false
value.converter.schemas.enable=false 

结果是现在写入主题:

{
    "idoc_id": "dc4ab8a0-fdf8-11e6-8285-1bce55915fdd",
    "idoc_event_ts": "dc4ab8a1-fdf8-11e6-8285-1bce55915fdd",
    "json_doc": "{\"foo\":\"bar\"}"
}

备注 使用快照发布中的代码并将 KCQL 更改为

INSERT INTO idocs-topic 
SELECT json_doc, idoc_event_ts
FROM idocs_events 
IGNORE idoc_event_ts
PK idoc_event_ts

在主题

上产生了这个结果
{"json_doc": "{\"foo\":\"bar\"}"}

谢谢

原来我试图做的事情在 DataMountaineer Stream Reactor 0.2.4 的 Cassandra 源中是不可能的。然而,快照版本(我假设将成为版本 0.2.5)将支持这一点。

这是它的工作原理:

1) 将 connect-distributed.properties 文件中的转换器设置为 StringConverter.

2) 将 Cassandra Source 连接器的 JSON 配置中的 KCQL 设置为

INSERT INTO idocs-topic 
SELECT json_doc, idoc_event_ts 
  FROM idocs_events 
IGNORE idoc_event_ts 
    PK idoc_event_ts 
WITHUNWRAP

这将导致 json_doc 列的值在没有任何模式信息或列名称本身的情况下发布到 Kafka 主题。

因此,如果列 json_doc 包含值 {"foo":"bar"} 那么这就是主题中会出现的内容:

{"foo":"bar"}

以下是有关 KCQL 在快照版本中如何工作的一些背景信息。

SELECT 现在将仅检索 table 中在 KCQL 中指定的列。最初它总是检索所有的列。需要注意的是,使用 incremental 导入模式时,PK 列必须是 SELECT 语句的一部分。如果 PK 列的值不应该包含在发布到 Kafka 主题的消息中,则将其添加到 IGNORE 语句中(如上例所示)。

WITHUNWRAP 是 KCQL 的新功能,它将告诉 Cassandra Source 连接器使用 String Schema 类型(而不是 Struct)创建 SourceRecord。在此模式下,只有 SELECT 语句中的列的值将存储为 SourceRecord 的值。如果在应用 IGNORE 语句后 SELECT 语句中有多个列,则将这些值附加在一起并用逗号分隔。