Kafka/PubSub 连接器:示例管道:错误任务转换字节 [],无法识别的令牌,期待('true'、'false' 或 'null')

Kafka/PubSub connector: Example pipeline: ERROR Task Converting byte[], Unrecognized token, was expecting ('true', 'false' or 'null')

我正在使用 kafka_2.11-0.10.2.1 和 google here 提供的 pubsub 连接器。我所要做的就是使用独立连接器将数据从 Kafka 主题推送到 PubSub。我遵循了所有应该执行的步骤:

  1. 产生了cps-kafka-connector.jar
  2. 在kafka的config目录中添加了cps-sink-connector.properties文件。该文件如下所示:
name=CPSConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
topics=kafka_topic
cps.topic=pubsub_topic
cps.project=my_gcp_project_12345
  1. 我确定我没有在 connect-standalone.properties:
  2. 中启用任何转换器
key.converter.schemas.enable=false
value.converter.schemas.enable=false
  1. 我创建了一个主题kafka_topic并发送了一些消息如下:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic
$ hello streams
$ kafka streams rock
  1. 我运行连接器如下:
$ bin/connect-standalone.sh config/connect-standalone.properties config/cps-sink-connector.properties

而本意是 运行:

$ gcloud beta pubsub subscriptions pull subscription_to_pubsub_topic

收集这些消息。但是,出现以下错误,我无法理解它们。有什么想法吗?我使用了错误的输入吗?正确的样本输入是什么?

    [2017-05-04 17:34:40,898] INFO Discovered coordinator 10.33.19.146:9092 (id: 2147483647 rack: null) for group connect-CPSConnector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:586)
  [2017-05-04 17:34:40,899] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
  [2017-05-04 17:34:40,900] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
  [2017-05-04 17:34:40,936] ERROR Task CPSConnector-4 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
  org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@3c06c37d; line: 1, column: 11]
  Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@3c06c37d; line: 1, column: 11]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  [2017-05-04 17:34:40,941] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
  [2017-05-04 17:34:43,837] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
  [2017-05-04 17:34:43,838] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
  [2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,847] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,847] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,848] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,853] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [test8-0] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,856] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,846] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,854] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,862] ERROR Task CPSConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
  org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@32a6e3e6; line: 1, column: 11]
  Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@32a6e3e6; line: 1, column: 11]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
    ...

connect-standalone.properties 中的这些行不会禁用转换器:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

他们禁止将架构包含在某些转换器中,例如 JSON 转换器。您感兴趣的行是:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.convertervalue.converter字段分别表示Kafka消息的key和value中数据的格式。由于您发布的消息无效 JSON,您会看到此错误。您需要将这些转换器设置为 StringConverter:

key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter