连接 Confluent Kafka 和 InfluxDB 时出现 NullPointerException
NullPointerException when connecting Confluent Kafka and InfluxDB
我正在尝试使用 Confluent InfluxDB Sink Connector 将数据从 kafka 主题获取到我的 InfluxDB 中。
首先,我使用nifi将数据从日志文件传输到kafka主题,效果很好。 kafka主题获取数据,如下:
{
"topic": "testDB5",
"key": null,
"value": {
"timestamp": "2019-03-20 01:24:29,461",
"measurement": "INFO",
"thread": "NiFi Web Server-795",
"class": "org.apache.nifi.web.filter.RequestLogger",
"message": "Attempting request for (anonymous)
},
"partition": 0,
"offset": 0
}
然后,我通过 Kafka Connect UI 创建 InfluxDB 接收器连接器,我得到以下异常:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:140)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
但是如果我使用
手动输入数据到另一个主题testDB1
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic testDB1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"measurement","type":"string"},{"name":"timestamp","type":"string"}]}'
成功了,我的influxDB可以取到数据了
这里是连接配置:
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=http://myurl
tasks.max=1
topics=testDB5
连接主题testDB1的配置除了主题名称外都是一样的。
nifi有什么问题吗?但是可以很好的向topic传输数据。
当您将 Avro 与 Kafka Connect 一起使用时,Avro 反序列化器期望数据已使用 Avro serialiser 序列化。这就是 kafak-avro-console-producer
使用的内容,这就是您的管道在您使用它时起作用的原因。
This article gives a good background to Avro and the Schema Registry. See also Kafka Connect Deep Dive – Converters and Serialization Explained.
我对Nifi不熟悉,但是看文档好像是AvroRecordSetWriter has the option to use Confluent Schema Registry。猜测您还想将 Schema Write Strategy
设置为 Confluent Schema Registry Reference
.
一旦您可以使用 kafka-avro-console-consumer
使用主题中的数据,那么您就知道它已正确序列化并且可以与您的 Kafka Connect 接收器一起使用。
找到原因了。是因为在Nifi中,我使用PublishKafka_0_10
发布数据到Kafka topic,但是版本太低了!
当我在 ksql 中进行查询时,它说
Input record ConsumerRecord(..data..) has invalid (negative) timestamp.
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp,
or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
所以,我将其更改为 PublishKafka_1_0
,然后重新开始,它成功了!我的influxDB可以拿到数据。我无语了。
感谢 Robin Moffatt 的回复,这对我很有帮助。
我正在尝试使用 Confluent InfluxDB Sink Connector 将数据从 kafka 主题获取到我的 InfluxDB 中。
首先,我使用nifi将数据从日志文件传输到kafka主题,效果很好。 kafka主题获取数据,如下:
{
"topic": "testDB5",
"key": null,
"value": {
"timestamp": "2019-03-20 01:24:29,461",
"measurement": "INFO",
"thread": "NiFi Web Server-795",
"class": "org.apache.nifi.web.filter.RequestLogger",
"message": "Attempting request for (anonymous)
},
"partition": 0,
"offset": 0
}
然后,我通过 Kafka Connect UI 创建 InfluxDB 接收器连接器,我得到以下异常:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:140)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
但是如果我使用
手动输入数据到另一个主题testDB1./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic testDB1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"measurement","type":"string"},{"name":"timestamp","type":"string"}]}'
成功了,我的influxDB可以取到数据了
这里是连接配置:
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=http://myurl
tasks.max=1
topics=testDB5
连接主题testDB1的配置除了主题名称外都是一样的。
nifi有什么问题吗?但是可以很好的向topic传输数据。
当您将 Avro 与 Kafka Connect 一起使用时,Avro 反序列化器期望数据已使用 Avro serialiser 序列化。这就是 kafak-avro-console-producer
使用的内容,这就是您的管道在您使用它时起作用的原因。
This article gives a good background to Avro and the Schema Registry. See also Kafka Connect Deep Dive – Converters and Serialization Explained.
我对Nifi不熟悉,但是看文档好像是AvroRecordSetWriter has the option to use Confluent Schema Registry。猜测您还想将 Schema Write Strategy
设置为 Confluent Schema Registry Reference
.
一旦您可以使用 kafka-avro-console-consumer
使用主题中的数据,那么您就知道它已正确序列化并且可以与您的 Kafka Connect 接收器一起使用。
找到原因了。是因为在Nifi中,我使用PublishKafka_0_10
发布数据到Kafka topic,但是版本太低了!
当我在 ksql 中进行查询时,它说
Input record ConsumerRecord(..data..) has invalid (negative) timestamp.
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp,
or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
所以,我将其更改为 PublishKafka_1_0
,然后重新开始,它成功了!我的influxDB可以拿到数据。我无语了。
感谢 Robin Moffatt 的回复,这对我很有帮助。