如何使用 ElasticsearchSinkConnector rename/transform json 对象中的嵌套字段

How to rename/transform nested fields in json object using ElasticsearchSinkConnector

我正在使用 ElasticsearchSinkConnector 将数据从 Kafka 主题存储到 Elasticsearch 索引。这是 Kafka 消息的示例:

{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
 "Timestamp" : "2020-11-02T12:05:57.87639003Z",
 "Type" : "CREATION",
 "PlaceType" : "home",
 "Location" : {
        "Lat" : 43.7575119,
        "Lon" : 11.2921363
      },
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}

我想在 ES 中将 Location 对象表示为 geo_pointLat/Lon 必须小写才能成为 geo_point 个对象。我正在使用 ReplaceField$Value 将 Location 重命名为“location”,但我无法重命名嵌套字段 Lat/Lon。这是我重命名位置、纬度和经度的代码段:

transforms: 'RenameField'
transforms.RenameField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames: 'Location:location,location.Lat:lat,location.Lon:lon'

位置有效,但 Lat/Lon 无效。简而言之,我希望在 ES 中得到以下结果:

{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
 "Timestamp" : "2020-11-02T12:05:57.87639003Z",
 "Type" : "CREATION",
 "PlaceType" : "home",
 "location" : {
        "lat" : 43.7575119,
        "lon" : 11.2921363
      },
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}

更新

太棒了,非常感谢。在 ksql-cli 中创建我的目标流时出现问题。

ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS "location"
>    FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS 'location'
>    FROM PLACES_EVENT;
line 3:64: mismatched input ''location'' expecting {'NO', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException

我尝试设置不带引号的 STRUCT 名称,但 ksql 向我抛出与第一个错误类似的错误。

ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS GeoPointLocation
>    FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'

你能帮帮我吗?

我遇到了这个完全相同的问题 - 我不知道现有的单一消息转换可以提供帮助。您有几个选择:

  1. 编写您自己的单一消息转换来执行此操作

  2. 使用 ksqlDB 处理模式,即 the route I chose

     CREATE STREAM OUTPUT_STREAM AS
         SELECT *,
         STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location"
       FROM SOURCE_STREAM
       EMIT CHANGES;
    

如果您还没有准备好 Elasticsearch 索引,您还需要 create a mapping template


扩展 ksqlDB 示例:

  1. 使用示例数据填充源主题:

    kafkacat -b localhost:9092 -P -t input_topic <<EOF
    { "ID": "7d6203f4-3ae7-4daa-af03-71f98d619f7e", "Timestamp": "2020-11-02T12:05:57.87639003Z", "Type": "CREATION", "PlaceType": "home", "Location": { "Lat": 43.7575119, "Lon": 11.2921363 }, "Created": "2020-11-02T12:05:57.876390266Z", "LastUpdated": "2020-11-02T12:05:57.876390398Z" }
    EOF
    
  2. 获取 source 的源主题,声明 ksqlDB STREAM 对象(基本上是覆盖了架构的 Kafka 主题):

    CREATE STREAM SOURCE_STREAM (ID VARCHAR,
                                Timestamp VARCHAR,
                                Type VARCHAR,
                                PlaceType VARCHAR,
                                Location STRUCT<Lat DOUBLE, Lon DOUBLE>,
                                Created VARCHAR,
                                LastUpdated VARCHAR)
            WITH (KAFKA_TOPIC='input_topic', 
                VALUE_FORMAT='JSON');
    
  3. 通过从第一条消息中选择字段来确认流的架构有效:

    ksql> SET 'auto.offset.reset' = 'earliest';
    >
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    
    ksql> SELECT ID, PLACETYPE, LOCATION->LAT, LOCATION->LON FROM SOURCE_STREAM EMIT CHANGES LIMIT 1;
    +---------------------------------------+----------+-----------+-----------+
    |ID                                     |PLACETYPE |LAT        |LON        |
    +---------------------------------------+----------+-----------+-----------+
    |7d6203f4-3ae7-4daa-af03-71f98d619f7e   |home      |43.7575119 |11.2921363 |
    Limit Reached
    Query terminated
    
  4. 创建目标流,将 lat/lon 字段映射到 lower-case 名称。在这里,我还展示了连接它们的替代方法,Elasticsearch 也将接受这种方法:

    CREATE STREAM TARGET_STREAM WITH (KAFKA_TOPIC='target_topic') AS
        SELECT *, 
            STRUCT("lat" := LOCATION->LAT, "lon":= LOCATION->LON) AS "location_example_01",
            CAST(LOCATION->LAT AS VARCHAR)  + ',' + CAST(LOCATION->LON AS VARCHAR) AS "location_example_02"
        FROM SOURCE_STREAM;
    
  5. 如果索引尚未声明 [​​=20=] 映射,则为 Elasticsearch 创建一个索引模板。在这里它将匹配任何以 target

    开头的索引
    curl --silent --show-error -XPUT -H 'Content-Type: application/json' \
        http://localhost:9200/_index_template/rmoff_template01/ \
        -d'{
            "index_patterns": [ "target*" ],
            "template": {
                "mappings": {
                    "properties": {
                        "location_example_01": {
                            "type": "geo_point"
                        },
                        "location_example_02": {
                            "type": "geo_point"
                        }
                    }
                }
            } }'
    
  6. 使用 Kafka Connect 将数据从 Kafka 流式传输到 Elasticsearch。您可以使用本机 Kafka Connect REST API 或直接从 ksqlDB 本身进行配置:

    CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
    'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'topics'                              = 'target_topic',
    'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'                     = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable'      = 'false',
    'connection.url'                      = 'http://elasticsearch:9200',
    'type.name'                           = '_doc',
    'key.ignore'                          = 'true',
    'schema.ignore'                       = 'true');
    
  7. 检查新 Elasticsearch 索引中的映射

    curl -XGET --silent --show-error http://localhost:9200"/target_topic/_mappings"  | jq '.'
    {
    "target_topic": {
        "mappings": {
        "properties": {
            "CREATED": {
            "type": "date"
            },
            "ID": {
            "type": "text",
            "fields": {
                "keyword": {
                "type": "keyword",
                "ignore_above": 256
                }
            }
            },
            "LASTUPDATED": {
            "type": "date"
            },
            "LOCATION": {
            "properties": {
                "LAT": {
                "type": "float"
                },
                "LON": {
                "type": "float"
                }
            }
            },
            "PLACETYPE": {
            "type": "text",
            "fields": {
                "keyword": {
                "type": "keyword",
                "ignore_above": 256
                }
            }
            },
            "TIMESTAMP": {
            "type": "date"
            },
            "TYPE": {
            "type": "text",
            "fields": {
                "keyword": {
                "type": "keyword",
                "ignore_above": 256
                }
            }
            },
            "location_example_01": {
            "type": "geo_point"
            },
            "location_example_02": {
            "type": "geo_point"
            }
        }
        }
    }
    }
    
  8. 查看数据