配置使用 Kafka Connect Cassandra Source 时写入 Kafka Topic 的内容
Configuring what is written to Kafka Topic when using Kafka Connect Cassandra Source
我正在做一个 spike,我们希望将在 Cassandra table 中写入的数据发布到 Kafka 主题。我们正在考虑使用 Kafka Connect 和 Stream Reactor 连接器。
我正在使用 Kafka 0.10.0.1
我正在使用 DataMountaineer Stream Reactor 0.2.4
我将 Stream Reactor 的 jar 文件放入 Kafka libs 文件夹中,然后 运行 Kafka Connect 处于分布式模式
bin/connect-distributed.sh config/connect-distributed.properties
我按如下方式添加了 Cassandra Source 连接器:
curl -X POST -H "Content-Type: application/json" -d @config/connect-idoc-cassandra-source.json.txt localhost:8083/connectors
当我向 Cassandra 添加数据时 table 我看到使用 Kafka 命令行消费者将其添加到主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic idocs-topic --from-beginning
这是目前正在写入主题的示例:
{
"schema": {
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "idoc_id"
}, {
"type": "string",
"optional": true,
"field": "idoc_event_ts"
}, {
"type": "string",
"optional": true,
"field": "json_doc"
}],
"optional": false,
"name": "idoc.idocs_events"
},
"payload": {
"idoc_id": "dc4ab8a0-fdf8-11e6-8285-1bce55915fdd",
"idoc_event_ts": "dc4ab8a1-fdf8-11e6-8285-1bce55915fdd",
"json_doc": "{\"foo\":\"bar\"}"
}}
我想写的主题是 json_doc
列的值。
这是我在 Cassandra 源配置中的内容
{
"name": "cassandra-idocs",
"config": {
"tasks.max": "1",
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
"connect.cassandra.key.space": "idoc",
"connect.cassandra.source.kcql": "INSERT INTO idocs-topic SELECT json_doc FROM idocs_events PK idoc_event_ts",
"connect.cassandra.import.mode": "incremental",
"connect.cassandra.contact.points": "localhost",
"connect.cassandra.port": 9042,
"connect.cassandra.import.poll.interval": 10000
}}
如何更改 Kafka Connect Cassandra Source 的配置方式,以便仅将 json_doc
的值写入主题,使其看起来像这样:
{"foo":"bar"}
Kassandra Connect Query Language 似乎是可行的方法,但它并不限制写入 KCQL 中指定列的内容。
更新
看到这个 并将 connect-distributed.properties
文件中的转换器从 JsonConverter
更改为 StringConverter
。
结果是现在写入主题:
Struct{idoc_id=74597cf0-fdf7-11e6-8285-1bce55915fdd,idoc_event_ts=74597cf1-fdf7-11e6-8285-1bce55915fdd,json_doc={"foo":"bar"}}
更新 2
将 connect-distributed.properties
文件中的转换器改回 JsonConverter
。然后还禁用了架构。
key.converter.schemas.enable=false
value.converter.schemas.enable=false
结果是现在写入主题:
{
"idoc_id": "dc4ab8a0-fdf8-11e6-8285-1bce55915fdd",
"idoc_event_ts": "dc4ab8a1-fdf8-11e6-8285-1bce55915fdd",
"json_doc": "{\"foo\":\"bar\"}"
}
备注
使用快照发布中的代码并将 KCQL 更改为
INSERT INTO idocs-topic
SELECT json_doc, idoc_event_ts
FROM idocs_events
IGNORE idoc_event_ts
PK idoc_event_ts
在主题
上产生了这个结果
{"json_doc": "{\"foo\":\"bar\"}"}
谢谢
原来我试图做的事情在 DataMountaineer Stream Reactor 0.2.4 的 Cassandra 源中是不可能的。然而,快照版本(我假设将成为版本 0.2.5)将支持这一点。
这是它的工作原理:
1) 将 connect-distributed.properties
文件中的转换器设置为 StringConverter
.
2) 将 Cassandra Source 连接器的 JSON 配置中的 KCQL 设置为
INSERT INTO idocs-topic
SELECT json_doc, idoc_event_ts
FROM idocs_events
IGNORE idoc_event_ts
PK idoc_event_ts
WITHUNWRAP
这将导致 json_doc
列的值在没有任何模式信息或列名称本身的情况下发布到 Kafka 主题。
因此,如果列 json_doc
包含值 {"foo":"bar"}
那么这就是主题中会出现的内容:
{"foo":"bar"}
以下是有关 KCQL 在快照版本中如何工作的一些背景信息。
SELECT
现在将仅检索 table 中在 KCQL 中指定的列。最初它总是检索所有的列。需要注意的是,使用 incremental
导入模式时,PK 列必须是 SELECT
语句的一部分。如果 PK 列的值不应该包含在发布到 Kafka 主题的消息中,则将其添加到 IGNORE
语句中(如上例所示)。
WITHUNWRAP
是 KCQL 的新功能,它将告诉 Cassandra Source 连接器使用 String Schema
类型(而不是 Struct)创建 SourceRecord
。在此模式下,只有 SELECT
语句中的列的值将存储为 SourceRecord
的值。如果在应用 IGNORE
语句后 SELECT
语句中有多个列,则将这些值附加在一起并用逗号分隔。
我正在做一个 spike,我们希望将在 Cassandra table 中写入的数据发布到 Kafka 主题。我们正在考虑使用 Kafka Connect 和 Stream Reactor 连接器。
我正在使用 Kafka 0.10.0.1
我正在使用 DataMountaineer Stream Reactor 0.2.4
我将 Stream Reactor 的 jar 文件放入 Kafka libs 文件夹中,然后 运行 Kafka Connect 处于分布式模式
bin/connect-distributed.sh config/connect-distributed.properties
我按如下方式添加了 Cassandra Source 连接器:
curl -X POST -H "Content-Type: application/json" -d @config/connect-idoc-cassandra-source.json.txt localhost:8083/connectors
当我向 Cassandra 添加数据时 table 我看到使用 Kafka 命令行消费者将其添加到主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic idocs-topic --from-beginning
这是目前正在写入主题的示例:
{
"schema": {
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "idoc_id"
}, {
"type": "string",
"optional": true,
"field": "idoc_event_ts"
}, {
"type": "string",
"optional": true,
"field": "json_doc"
}],
"optional": false,
"name": "idoc.idocs_events"
},
"payload": {
"idoc_id": "dc4ab8a0-fdf8-11e6-8285-1bce55915fdd",
"idoc_event_ts": "dc4ab8a1-fdf8-11e6-8285-1bce55915fdd",
"json_doc": "{\"foo\":\"bar\"}"
}}
我想写的主题是 json_doc
列的值。
这是我在 Cassandra 源配置中的内容
{
"name": "cassandra-idocs",
"config": {
"tasks.max": "1",
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
"connect.cassandra.key.space": "idoc",
"connect.cassandra.source.kcql": "INSERT INTO idocs-topic SELECT json_doc FROM idocs_events PK idoc_event_ts",
"connect.cassandra.import.mode": "incremental",
"connect.cassandra.contact.points": "localhost",
"connect.cassandra.port": 9042,
"connect.cassandra.import.poll.interval": 10000
}}
如何更改 Kafka Connect Cassandra Source 的配置方式,以便仅将 json_doc
的值写入主题,使其看起来像这样:
{"foo":"bar"}
Kassandra Connect Query Language 似乎是可行的方法,但它并不限制写入 KCQL 中指定列的内容。
更新
看到这个 connect-distributed.properties
文件中的转换器从 JsonConverter
更改为 StringConverter
。
结果是现在写入主题:
Struct{idoc_id=74597cf0-fdf7-11e6-8285-1bce55915fdd,idoc_event_ts=74597cf1-fdf7-11e6-8285-1bce55915fdd,json_doc={"foo":"bar"}}
更新 2
将 connect-distributed.properties
文件中的转换器改回 JsonConverter
。然后还禁用了架构。
key.converter.schemas.enable=false
value.converter.schemas.enable=false
结果是现在写入主题:
{
"idoc_id": "dc4ab8a0-fdf8-11e6-8285-1bce55915fdd",
"idoc_event_ts": "dc4ab8a1-fdf8-11e6-8285-1bce55915fdd",
"json_doc": "{\"foo\":\"bar\"}"
}
备注 使用快照发布中的代码并将 KCQL 更改为
INSERT INTO idocs-topic
SELECT json_doc, idoc_event_ts
FROM idocs_events
IGNORE idoc_event_ts
PK idoc_event_ts
在主题
上产生了这个结果{"json_doc": "{\"foo\":\"bar\"}"}
谢谢
原来我试图做的事情在 DataMountaineer Stream Reactor 0.2.4 的 Cassandra 源中是不可能的。然而,快照版本(我假设将成为版本 0.2.5)将支持这一点。
这是它的工作原理:
1) 将 connect-distributed.properties
文件中的转换器设置为 StringConverter
.
2) 将 Cassandra Source 连接器的 JSON 配置中的 KCQL 设置为
INSERT INTO idocs-topic
SELECT json_doc, idoc_event_ts
FROM idocs_events
IGNORE idoc_event_ts
PK idoc_event_ts
WITHUNWRAP
这将导致 json_doc
列的值在没有任何模式信息或列名称本身的情况下发布到 Kafka 主题。
因此,如果列 json_doc
包含值 {"foo":"bar"}
那么这就是主题中会出现的内容:
{"foo":"bar"}
以下是有关 KCQL 在快照版本中如何工作的一些背景信息。
SELECT
现在将仅检索 table 中在 KCQL 中指定的列。最初它总是检索所有的列。需要注意的是,使用 incremental
导入模式时,PK 列必须是 SELECT
语句的一部分。如果 PK 列的值不应该包含在发布到 Kafka 主题的消息中,则将其添加到 IGNORE
语句中(如上例所示)。
WITHUNWRAP
是 KCQL 的新功能,它将告诉 Cassandra Source 连接器使用 String Schema
类型(而不是 Struct)创建 SourceRecord
。在此模式下,只有 SELECT
语句中的列的值将存储为 SourceRecord
的值。如果在应用 IGNORE
语句后 SELECT
语句中有多个列,则将这些值附加在一起并用逗号分隔。