如何使用 ElasticsearchSinkConnector rename/transform json 对象中的嵌套字段
How to rename/transform nested fields in json object using ElasticsearchSinkConnector
我正在使用 ElasticsearchSinkConnector 将数据从 Kafka 主题存储到 Elasticsearch 索引。这是 Kafka 消息的示例:
{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
"Timestamp" : "2020-11-02T12:05:57.87639003Z",
"Type" : "CREATION",
"PlaceType" : "home",
"Location" : {
"Lat" : 43.7575119,
"Lon" : 11.2921363
},
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}
我想在 ES 中将 Location
对象表示为 geo_point 但 Lat/Lon 必须小写才能成为 geo_point 个对象。我正在使用 ReplaceField$Value
将 Location 重命名为“location”,但我无法重命名嵌套字段 Lat/Lon。这是我重命名位置、纬度和经度的代码段:
transforms: 'RenameField'
transforms.RenameField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames: 'Location:location,location.Lat:lat,location.Lon:lon'
位置有效,但 Lat/Lon 无效。简而言之,我希望在 ES 中得到以下结果:
{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
"Timestamp" : "2020-11-02T12:05:57.87639003Z",
"Type" : "CREATION",
"PlaceType" : "home",
"location" : {
"lat" : 43.7575119,
"lon" : 11.2921363
},
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}
更新
太棒了,非常感谢。在 ksql-cli 中创建我的目标流时出现问题。
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS "location"
> FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS 'location'
> FROM PLACES_EVENT;
line 3:64: mismatched input ''location'' expecting {'NO', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException
我尝试设置不带引号的 STRUCT 名称,但 ksql 向我抛出与第一个错误类似的错误。
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS GeoPointLocation
> FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
你能帮帮我吗?
我遇到了这个完全相同的问题 - 我不知道现有的单一消息转换可以提供帮助。您有几个选择:
编写您自己的单一消息转换来执行此操作
使用 ksqlDB 处理模式,即 the route I chose
CREATE STREAM OUTPUT_STREAM AS
SELECT *,
STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location"
FROM SOURCE_STREAM
EMIT CHANGES;
如果您还没有准备好 Elasticsearch 索引,您还需要 create a mapping template
扩展 ksqlDB 示例:
使用示例数据填充源主题:
kafkacat -b localhost:9092 -P -t input_topic <<EOF
{ "ID": "7d6203f4-3ae7-4daa-af03-71f98d619f7e", "Timestamp": "2020-11-02T12:05:57.87639003Z", "Type": "CREATION", "PlaceType": "home", "Location": { "Lat": 43.7575119, "Lon": 11.2921363 }, "Created": "2020-11-02T12:05:57.876390266Z", "LastUpdated": "2020-11-02T12:05:57.876390398Z" }
EOF
获取 source
的源主题,声明 ksqlDB STREAM
对象(基本上是覆盖了架构的 Kafka 主题):
CREATE STREAM SOURCE_STREAM (ID VARCHAR,
Timestamp VARCHAR,
Type VARCHAR,
PlaceType VARCHAR,
Location STRUCT<Lat DOUBLE, Lon DOUBLE>,
Created VARCHAR,
LastUpdated VARCHAR)
WITH (KAFKA_TOPIC='input_topic',
VALUE_FORMAT='JSON');
通过从第一条消息中选择字段来确认流的架构有效:
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ID, PLACETYPE, LOCATION->LAT, LOCATION->LON FROM SOURCE_STREAM EMIT CHANGES LIMIT 1;
+---------------------------------------+----------+-----------+-----------+
|ID |PLACETYPE |LAT |LON |
+---------------------------------------+----------+-----------+-----------+
|7d6203f4-3ae7-4daa-af03-71f98d619f7e |home |43.7575119 |11.2921363 |
Limit Reached
Query terminated
创建目标流,将 lat/lon 字段映射到 lower-case 名称。在这里,我还展示了连接它们的替代方法,Elasticsearch 也将接受这种方法:
CREATE STREAM TARGET_STREAM WITH (KAFKA_TOPIC='target_topic') AS
SELECT *,
STRUCT("lat" := LOCATION->LAT, "lon":= LOCATION->LON) AS "location_example_01",
CAST(LOCATION->LAT AS VARCHAR) + ',' + CAST(LOCATION->LON AS VARCHAR) AS "location_example_02"
FROM SOURCE_STREAM;
如果索引尚未声明 [=20=] 映射,则为 Elasticsearch 创建一个索引模板。在这里它将匹配任何以 target
开头的索引
curl --silent --show-error -XPUT -H 'Content-Type: application/json' \
http://localhost:9200/_index_template/rmoff_template01/ \
-d'{
"index_patterns": [ "target*" ],
"template": {
"mappings": {
"properties": {
"location_example_01": {
"type": "geo_point"
},
"location_example_02": {
"type": "geo_point"
}
}
}
} }'
使用 Kafka Connect 将数据从 Kafka 流式传输到 Elasticsearch。您可以使用本机 Kafka Connect REST API 或直接从 ksqlDB 本身进行配置:
CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics' = 'target_topic',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc',
'key.ignore' = 'true',
'schema.ignore' = 'true');
检查新 Elasticsearch 索引中的映射
curl -XGET --silent --show-error http://localhost:9200"/target_topic/_mappings" | jq '.'
{
"target_topic": {
"mappings": {
"properties": {
"CREATED": {
"type": "date"
},
"ID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"LASTUPDATED": {
"type": "date"
},
"LOCATION": {
"properties": {
"LAT": {
"type": "float"
},
"LON": {
"type": "float"
}
}
},
"PLACETYPE": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"TIMESTAMP": {
"type": "date"
},
"TYPE": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"location_example_01": {
"type": "geo_point"
},
"location_example_02": {
"type": "geo_point"
}
}
}
}
}
查看数据
我正在使用 ElasticsearchSinkConnector 将数据从 Kafka 主题存储到 Elasticsearch 索引。这是 Kafka 消息的示例:
{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
"Timestamp" : "2020-11-02T12:05:57.87639003Z",
"Type" : "CREATION",
"PlaceType" : "home",
"Location" : {
"Lat" : 43.7575119,
"Lon" : 11.2921363
},
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}
我想在 ES 中将 Location
对象表示为 geo_point 但 Lat/Lon 必须小写才能成为 geo_point 个对象。我正在使用 ReplaceField$Value
将 Location 重命名为“location”,但我无法重命名嵌套字段 Lat/Lon。这是我重命名位置、纬度和经度的代码段:
transforms: 'RenameField'
transforms.RenameField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames: 'Location:location,location.Lat:lat,location.Lon:lon'
位置有效,但 Lat/Lon 无效。简而言之,我希望在 ES 中得到以下结果:
{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
"Timestamp" : "2020-11-02T12:05:57.87639003Z",
"Type" : "CREATION",
"PlaceType" : "home",
"location" : {
"lat" : 43.7575119,
"lon" : 11.2921363
},
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}
更新
太棒了,非常感谢。在 ksql-cli 中创建我的目标流时出现问题。
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS "location"
> FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS 'location'
> FROM PLACES_EVENT;
line 3:64: mismatched input ''location'' expecting {'NO', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException
我尝试设置不带引号的 STRUCT 名称,但 ksql 向我抛出与第一个错误类似的错误。
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS GeoPointLocation
> FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
你能帮帮我吗?
我遇到了这个完全相同的问题 - 我不知道现有的单一消息转换可以提供帮助。您有几个选择:
编写您自己的单一消息转换来执行此操作
使用 ksqlDB 处理模式,即 the route I chose
CREATE STREAM OUTPUT_STREAM AS SELECT *, STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location" FROM SOURCE_STREAM EMIT CHANGES;
如果您还没有准备好 Elasticsearch 索引,您还需要 create a mapping template
扩展 ksqlDB 示例:
使用示例数据填充源主题:
kafkacat -b localhost:9092 -P -t input_topic <<EOF { "ID": "7d6203f4-3ae7-4daa-af03-71f98d619f7e", "Timestamp": "2020-11-02T12:05:57.87639003Z", "Type": "CREATION", "PlaceType": "home", "Location": { "Lat": 43.7575119, "Lon": 11.2921363 }, "Created": "2020-11-02T12:05:57.876390266Z", "LastUpdated": "2020-11-02T12:05:57.876390398Z" } EOF
获取
source
的源主题,声明 ksqlDBSTREAM
对象(基本上是覆盖了架构的 Kafka 主题):CREATE STREAM SOURCE_STREAM (ID VARCHAR, Timestamp VARCHAR, Type VARCHAR, PlaceType VARCHAR, Location STRUCT<Lat DOUBLE, Lon DOUBLE>, Created VARCHAR, LastUpdated VARCHAR) WITH (KAFKA_TOPIC='input_topic', VALUE_FORMAT='JSON');
通过从第一条消息中选择字段来确认流的架构有效:
ksql> SET 'auto.offset.reset' = 'earliest'; > Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change. ksql> SELECT ID, PLACETYPE, LOCATION->LAT, LOCATION->LON FROM SOURCE_STREAM EMIT CHANGES LIMIT 1; +---------------------------------------+----------+-----------+-----------+ |ID |PLACETYPE |LAT |LON | +---------------------------------------+----------+-----------+-----------+ |7d6203f4-3ae7-4daa-af03-71f98d619f7e |home |43.7575119 |11.2921363 | Limit Reached Query terminated
创建目标流,将 lat/lon 字段映射到 lower-case 名称。在这里,我还展示了连接它们的替代方法,Elasticsearch 也将接受这种方法:
CREATE STREAM TARGET_STREAM WITH (KAFKA_TOPIC='target_topic') AS SELECT *, STRUCT("lat" := LOCATION->LAT, "lon":= LOCATION->LON) AS "location_example_01", CAST(LOCATION->LAT AS VARCHAR) + ',' + CAST(LOCATION->LON AS VARCHAR) AS "location_example_02" FROM SOURCE_STREAM;
如果索引尚未声明 [=20=] 映射,则为 Elasticsearch 创建一个索引模板。在这里它将匹配任何以
开头的索引target
curl --silent --show-error -XPUT -H 'Content-Type: application/json' \ http://localhost:9200/_index_template/rmoff_template01/ \ -d'{ "index_patterns": [ "target*" ], "template": { "mappings": { "properties": { "location_example_01": { "type": "geo_point" }, "location_example_02": { "type": "geo_point" } } } } }'
使用 Kafka Connect 将数据从 Kafka 流式传输到 Elasticsearch。您可以使用本机 Kafka Connect REST API 或直接从 ksqlDB 本身进行配置:
CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'topics' = 'target_topic', 'key.converter' = 'org.apache.kafka.connect.storage.StringConverter', 'value.converter' = 'org.apache.kafka.connect.json.JsonConverter', 'value.converter.schemas.enable' = 'false', 'connection.url' = 'http://elasticsearch:9200', 'type.name' = '_doc', 'key.ignore' = 'true', 'schema.ignore' = 'true');
检查新 Elasticsearch 索引中的映射
curl -XGET --silent --show-error http://localhost:9200"/target_topic/_mappings" | jq '.' { "target_topic": { "mappings": { "properties": { "CREATED": { "type": "date" }, "ID": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "LASTUPDATED": { "type": "date" }, "LOCATION": { "properties": { "LAT": { "type": "float" }, "LON": { "type": "float" } } }, "PLACETYPE": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "TIMESTAMP": { "type": "date" }, "TYPE": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "location_example_01": { "type": "geo_point" }, "location_example_02": { "type": "geo_point" } } } } }
查看数据