mapper_parsing_exception 在 Elasticsearch 6.5.4 中

mapper_parsing_exception in Elasticsearch 6.5.4

我目前 运行 整合了: Debezium MySQL 连接器源代码(v0.9.0F), Kafka connect(confluent platform v5.1.2) 和 接收器端的 ES(v6.5.4)。 源连接器能够成功解析 MySQL 中的 tables(已在 kafka 日志中检查),但在接收器端 (ES) 发生以下错误(具有某些 tables 和指定的列)。

"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [column1] has unsupported parameters:  [null_value : 1970-01-01T00:00:00Z]\"

column1的DDL如下:

`column1` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

但是,table 中没有等于 1970-01-01T00:00:00Z 的数据(不确定这是否重要)

另一个table的专栏也是如此:

"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [column2] has unsupported parameters:  [null_value : ---]\"

column2 的 DDL:

`column2` char(3) COLLATE utf8_unicode_ci NOT NULL DEFAULT '---'

错误的调用堆栈:

 at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:253)
        at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:257)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:161)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-22 15:48:40,217] ERROR WorkerSinkTask{id=stage_refdata_company_essink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.W
orkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot create mapping 
<mapping of table explanation>
 at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:253)
        at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:257)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:161)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
        ... 10 more

数据存在于 table 中,默认值为 '---'

  1. 为什么 ES 会在这两个值上抛出错误?
  2. column2 已明确指定 char(3) 作为其类型,因此 '---' 应该是一个可接受的值。

你能帮忙解决这些错误吗?

将来如何避免其他一些自定义默认值出现这些错误?

  1. 我将验证 Elasticsearch 中这些索引的映射是否符合预期 - column1 类型应为 Date 数据类型,具有预期格式,column2 应为关键字。

  2. 需要确保接收器按预期处理 null/missing 值,并且默认值与 elasticsearch 模板匹配。将“---”索引到关键字和将“1970-01-01T00:00:00Z”索引到日期字段应该没有问题,但可能接收器未按预期转换为默认值。

  3. 您还可以检查 ES 日志文件,以获取完整的 Mapper 解析异常日志,这将有助于详细了解正在索引的内容以及失败的原因。