使用 Avro 从 Logastash 发送到 Kafka
Sending from Logastash to Kafka in with Avro
我正在尝试使用 avro 模式将数据从 logstash 发送到 kafka。
我的 logstash 输出如下:
kafka{
codec => avro {
schema_uri => "/tmp/avro/hadoop.avsc"
}
topic_id => "hadoop_log_processed"
}
我的架构文件如下所示:
{"type": "record",
"name": "hadoop_schema",
"fields": [
{"name": "loglevel", "type": "string"},
{"name": "error_msg", "type": "string"},
{"name": "syslog", "type": ["string", "null"]},
{"name": "javaclass", "type": ["string", "null"]}
]
}
kafka-console-consumer
的输出:
CElORk+gAURvd24gdG8gdGhlIGxhc3QgbWVyZ2UtcGCzcywgd2l0aCA3IHNlZ21lbnRzIGxlZnQgb2YgdG90YWwgc256ZTogMjI4NDI0NDM5IGJ5dGVzAAxbbWFpbl0APm9yZy5hcGFjaGUuaGFkb29wLm1hcHJlZC5NZXJnZXI=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9OVGFza0hlYAJ0YmVhdEhhbmRsZXIgdGhyZWFkIGludGVycnVwdGVkAERbVGFza0hlYXJdYmVhdEhhbmRsZXIgUGluZ0NoZWNrZXJdAG5vcmcuYVBhY2hlLmhhZG9vcC5tYXByZWR1Y2UudjIuYXBwLlRhc2tIZWFydGJ3YXRIYW5kbGVy
我的连接器中也出现以下错误:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hadoop_log_processed to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我知道我对 logstash 站点上的数据进行了编码。我是否必须在输入 kafka 期间解码消息,或者我可以 decode/deserialize 连接器配置中的数据?
有没有办法在 logstash 站点上禁用编码?我读到了 base64_encoding 个选项,但似乎没有这个选项。
您在这里遇到的问题是 Logstash 的 Avro 编解码器没有将数据序列化为 Confluent Schema Registry Avro 反序列化器期望的 Avro 形式。
虽然 Logstash 采用 avsc 并将数据编码为二进制形式,但 Confluent Schema Registry [de]serialiser 直接从注册表(而不是 avsc
文件)存储和检索模式。
因此,当您收到 Failed to deserialize data … SerializationException: Unknown magic byte!
时,这是 Avro 反序列化程序表示它无法将数据识别为使用 Schema Registry 序列化程序序列化的 Avro。
我快速 Google 发现 this codec 看起来它支持 Schema Registry(因此支持 Kafka Connect,以及任何其他以这种方式反序列化 Avro 数据的消费者)。
或者,将您的数据作为 JSON 写入 Kafka,然后使用 Kafka Connect 中的 org.apache.kafka.connect.json.JsonConverter
从主题中读取它。
参考:
我正在尝试使用 avro 模式将数据从 logstash 发送到 kafka。
我的 logstash 输出如下:
kafka{
codec => avro {
schema_uri => "/tmp/avro/hadoop.avsc"
}
topic_id => "hadoop_log_processed"
}
我的架构文件如下所示:
{"type": "record",
"name": "hadoop_schema",
"fields": [
{"name": "loglevel", "type": "string"},
{"name": "error_msg", "type": "string"},
{"name": "syslog", "type": ["string", "null"]},
{"name": "javaclass", "type": ["string", "null"]}
]
}
kafka-console-consumer
的输出:
CElORk+gAURvd24gdG8gdGhlIGxhc3QgbWVyZ2UtcGCzcywgd2l0aCA3IHNlZ21lbnRzIGxlZnQgb2YgdG90YWwgc256ZTogMjI4NDI0NDM5IGJ5dGVzAAxbbWFpbl0APm9yZy5hcGFjaGUuaGFkb29wLm1hcHJlZC5NZXJnZXI=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9OVGFza0hlYAJ0YmVhdEhhbmRsZXIgdGhyZWFkIGludGVycnVwdGVkAERbVGFza0hlYXJdYmVhdEhhbmRsZXIgUGluZ0NoZWNrZXJdAG5vcmcuYVBhY2hlLmhhZG9vcC5tYXByZWR1Y2UudjIuYXBwLlRhc2tIZWFydGJ3YXRIYW5kbGVy
我的连接器中也出现以下错误:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hadoop_log_processed to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我知道我对 logstash 站点上的数据进行了编码。我是否必须在输入 kafka 期间解码消息,或者我可以 decode/deserialize 连接器配置中的数据?
有没有办法在 logstash 站点上禁用编码?我读到了 base64_encoding 个选项,但似乎没有这个选项。
您在这里遇到的问题是 Logstash 的 Avro 编解码器没有将数据序列化为 Confluent Schema Registry Avro 反序列化器期望的 Avro 形式。
虽然 Logstash 采用 avsc 并将数据编码为二进制形式,但 Confluent Schema Registry [de]serialiser 直接从注册表(而不是 avsc
文件)存储和检索模式。
因此,当您收到 Failed to deserialize data … SerializationException: Unknown magic byte!
时,这是 Avro 反序列化程序表示它无法将数据识别为使用 Schema Registry 序列化程序序列化的 Avro。
我快速 Google 发现 this codec 看起来它支持 Schema Registry(因此支持 Kafka Connect,以及任何其他以这种方式反序列化 Avro 数据的消费者)。
或者,将您的数据作为 JSON 写入 Kafka,然后使用 Kafka Connect 中的 org.apache.kafka.connect.json.JsonConverter
从主题中读取它。
参考: