Kafka/PubSub 连接器:示例管道:错误任务转换字节 [],无法识别的令牌,期待('true'、'false' 或 'null')
Kafka/PubSub connector: Example pipeline: ERROR Task Converting byte[], Unrecognized token, was expecting ('true', 'false' or 'null')
我正在使用 kafka_2.11-0.10.2.1
和 google here 提供的 pubsub 连接器。我所要做的就是使用独立连接器将数据从 Kafka 主题推送到 PubSub。我遵循了所有应该执行的步骤:
- 产生了
cps-kafka-connector.jar
- 在kafka的
config
目录中添加了cps-sink-connector.properties
文件。该文件如下所示:
name=CPSConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
topics=kafka_topic
cps.topic=pubsub_topic
cps.project=my_gcp_project_12345
- 我确定我没有在
connect-standalone.properties
: 中启用任何转换器
key.converter.schemas.enable=false
value.converter.schemas.enable=false
- 我创建了一个主题
kafka_topic
并发送了一些消息如下:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic
$ hello streams
$ kafka streams rock
- 我运行连接器如下:
$ bin/connect-standalone.sh config/connect-standalone.properties config/cps-sink-connector.properties
而本意是 运行:
$ gcloud beta pubsub subscriptions pull subscription_to_pubsub_topic
收集这些消息。但是,出现以下错误,我无法理解它们。有什么想法吗?我使用了错误的输入吗?正确的样本输入是什么?
[2017-05-04 17:34:40,898] INFO Discovered coordinator 10.33.19.146:9092 (id: 2147483647 rack: null) for group connect-CPSConnector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:586)
[2017-05-04 17:34:40,899] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
[2017-05-04 17:34:40,900] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
[2017-05-04 17:34:40,936] ERROR Task CPSConnector-4 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@3c06c37d; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@3c06c37d; line: 1, column: 11]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-05-04 17:34:40,941] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
[2017-05-04 17:34:43,837] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
[2017-05-04 17:34:43,838] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
[2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,847] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,847] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,848] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,853] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [test8-0] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,856] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,846] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,854] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,862] ERROR Task CPSConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@32a6e3e6; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@32a6e3e6; line: 1, column: 11]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
...
connect-standalone.properties
中的这些行不会禁用转换器:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
他们禁止将架构包含在某些转换器中,例如 JSON 转换器。您感兴趣的行是:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter
和value.converter
字段分别表示Kafka消息的key和value中数据的格式。由于您发布的消息无效 JSON,您会看到此错误。您需要将这些转换器设置为 StringConverter:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
我正在使用 kafka_2.11-0.10.2.1
和 google here 提供的 pubsub 连接器。我所要做的就是使用独立连接器将数据从 Kafka 主题推送到 PubSub。我遵循了所有应该执行的步骤:
- 产生了
cps-kafka-connector.jar
- 在kafka的
config
目录中添加了cps-sink-connector.properties
文件。该文件如下所示:
name=CPSConnector connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector tasks.max=10 topics=kafka_topic cps.topic=pubsub_topic cps.project=my_gcp_project_12345
- 我确定我没有在
connect-standalone.properties
: 中启用任何转换器
key.converter.schemas.enable=false value.converter.schemas.enable=false
- 我创建了一个主题
kafka_topic
并发送了一些消息如下:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic $ hello streams $ kafka streams rock
- 我运行连接器如下:
$ bin/connect-standalone.sh config/connect-standalone.properties config/cps-sink-connector.properties
而本意是 运行:
$ gcloud beta pubsub subscriptions pull subscription_to_pubsub_topic
收集这些消息。但是,出现以下错误,我无法理解它们。有什么想法吗?我使用了错误的输入吗?正确的样本输入是什么?
[2017-05-04 17:34:40,898] INFO Discovered coordinator 10.33.19.146:9092 (id: 2147483647 rack: null) for group connect-CPSConnector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:586)
[2017-05-04 17:34:40,899] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
[2017-05-04 17:34:40,900] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
[2017-05-04 17:34:40,936] ERROR Task CPSConnector-4 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@3c06c37d; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@3c06c37d; line: 1, column: 11]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-05-04 17:34:40,941] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
[2017-05-04 17:34:43,837] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
[2017-05-04 17:34:43,838] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
[2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,847] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,847] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,848] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,853] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [test8-0] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-04 17:34:43,856] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,846] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,854] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
[2017-05-04 17:34:43,862] ERROR Task CPSConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@32a6e3e6; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@32a6e3e6; line: 1, column: 11]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
...
connect-standalone.properties
中的这些行不会禁用转换器:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
他们禁止将架构包含在某些转换器中,例如 JSON 转换器。您感兴趣的行是:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter
和value.converter
字段分别表示Kafka消息的key和value中数据的格式。由于您发布的消息无效 JSON,您会看到此错误。您需要将这些转换器设置为 StringConverter:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter