mapper_parsing_exception 在 Elasticsearch 6.5.4 中
mapper_parsing_exception in Elasticsearch 6.5.4
我目前 运行 整合了:
Debezium MySQL 连接器源代码(v0.9.0F),
Kafka connect(confluent platform v5.1.2) 和
接收器端的 ES(v6.5.4)。
源连接器能够成功解析 MySQL 中的 tables(已在 kafka 日志中检查),但在接收器端 (ES) 发生以下错误(具有某些 tables 和指定的列)。
"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [column1] has unsupported parameters: [null_value : 1970-01-01T00:00:00Z]\"
column1
的DDL如下:
`column1` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
但是,table 中没有等于 1970-01-01T00:00:00Z
的数据(不确定这是否重要)
另一个table的专栏也是如此:
"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [column2] has unsupported parameters: [null_value : ---]\"
column2
的 DDL:
`column2` char(3) COLLATE utf8_unicode_ci NOT NULL DEFAULT '---'
错误的调用堆栈:
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:253)
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:257)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-02-22 15:48:40,217] ERROR WorkerSinkTask{id=stage_refdata_company_essink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.W
orkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot create mapping
<mapping of table explanation>
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:253)
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:257)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
数据存在于 table 中,默认值为 '---'
。
- 为什么 ES 会在这两个值上抛出错误?
column2
已明确指定 char(3)
作为其类型,因此 '---'
应该是一个可接受的值。
你能帮忙解决这些错误吗?
将来如何避免其他一些自定义默认值出现这些错误?
我将验证 Elasticsearch 中这些索引的映射是否符合预期 - column1 类型应为 Date 数据类型,具有预期格式,column2 应为关键字。
需要确保接收器按预期处理 null/missing 值,并且默认值与 elasticsearch 模板匹配。将“---”索引到关键字和将“1970-01-01T00:00:00Z”索引到日期字段应该没有问题,但可能接收器未按预期转换为默认值。
您还可以检查 ES 日志文件,以获取完整的 Mapper 解析异常日志,这将有助于详细了解正在索引的内容以及失败的原因。
我目前 运行 整合了: Debezium MySQL 连接器源代码(v0.9.0F), Kafka connect(confluent platform v5.1.2) 和 接收器端的 ES(v6.5.4)。 源连接器能够成功解析 MySQL 中的 tables(已在 kafka 日志中检查),但在接收器端 (ES) 发生以下错误(具有某些 tables 和指定的列)。
"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [column1] has unsupported parameters: [null_value : 1970-01-01T00:00:00Z]\"
column1
的DDL如下:
`column1` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
但是,table 中没有等于 1970-01-01T00:00:00Z
的数据(不确定这是否重要)
另一个table的专栏也是如此:
"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [column2] has unsupported parameters: [null_value : ---]\"
column2
的 DDL:
`column2` char(3) COLLATE utf8_unicode_ci NOT NULL DEFAULT '---'
错误的调用堆栈:
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:253)
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:257)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-02-22 15:48:40,217] ERROR WorkerSinkTask{id=stage_refdata_company_essink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.W
orkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot create mapping
<mapping of table explanation>
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:253)
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:257)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
数据存在于 table 中,默认值为 '---'
。
- 为什么 ES 会在这两个值上抛出错误?
column2
已明确指定char(3)
作为其类型,因此'---'
应该是一个可接受的值。
你能帮忙解决这些错误吗?
将来如何避免其他一些自定义默认值出现这些错误?
我将验证 Elasticsearch 中这些索引的映射是否符合预期 - column1 类型应为 Date 数据类型,具有预期格式,column2 应为关键字。
需要确保接收器按预期处理 null/missing 值,并且默认值与 elasticsearch 模板匹配。将“---”索引到关键字和将“1970-01-01T00:00:00Z”索引到日期字段应该没有问题,但可能接收器未按预期转换为默认值。
您还可以检查 ES 日志文件,以获取完整的 Mapper 解析异常日志,这将有助于详细了解正在索引的内容以及失败的原因。