使用 Kafka 的 Logstash:无法解码 avro
Logstash with Kafka: Unable to decode avro
我正在尝试使用来自 Kafka 队列的序列化 avro 事件。使用简单的 java 生产者填充 kafka 队列。为了清楚起见,我分享了三个组成部分:
Avro 架构文件
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Java生产者代码片段(User.class是使用avro-tools生产的)
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
user1.setFavoriteColor("blue");
String topic = "MemoryTest";
// Properties set in 'props'
KafkaProducer<Message, byte[]> producer = new KafkaProducer<Message, byte[]>(props);
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(user1, encoder);
encoder.flush();
out.close();
byte[] serializedBytes = out.toByteArray();
producer.send(new ProducerRecord<Message, byte[]>(topic, serializedBytes));
Logstash 配置文件
input {
kafka {
zk_connect => "localhost:2181"
topic_id => "MemoryTest"
type => "standard_event"
group_id => "butiline_dash_prod"
reset_beginning => true
auto_offset_reset => smallest
codec => {
avro => {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
}
}
}
output {
stdout {
codec => rubydebug
}
}
问题
管道在 logstash 级别失败。当一个新事件被推送到 Kafka 时,我在 logstash 控制台上得到以下消息:
Alyssa�blue {:exception=>#<NoMethodError: undefined method `decode' for ["avro", {"schema_uri"=>"/opt/ELK/logstash-1.5.4/bin/user.avsc"}]:Array>, :backtrace=>["/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error}
终于找到错误了。而不是这个(如 Logstash 网站上的建议 - https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html)
codec => {
avro => {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
}
正确的语法是(如插件文档 https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md 中所建议):
codec => avro {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
我想语法已经改变了。
我正在尝试使用来自 Kafka 队列的序列化 avro 事件。使用简单的 java 生产者填充 kafka 队列。为了清楚起见,我分享了三个组成部分:
Avro 架构文件
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Java生产者代码片段(User.class是使用avro-tools生产的)
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
user1.setFavoriteColor("blue");
String topic = "MemoryTest";
// Properties set in 'props'
KafkaProducer<Message, byte[]> producer = new KafkaProducer<Message, byte[]>(props);
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(user1, encoder);
encoder.flush();
out.close();
byte[] serializedBytes = out.toByteArray();
producer.send(new ProducerRecord<Message, byte[]>(topic, serializedBytes));
Logstash 配置文件
input {
kafka {
zk_connect => "localhost:2181"
topic_id => "MemoryTest"
type => "standard_event"
group_id => "butiline_dash_prod"
reset_beginning => true
auto_offset_reset => smallest
codec => {
avro => {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
}
}
}
output {
stdout {
codec => rubydebug
}
}
问题
管道在 logstash 级别失败。当一个新事件被推送到 Kafka 时,我在 logstash 控制台上得到以下消息:
Alyssa�blue {:exception=>#<NoMethodError: undefined method `decode' for ["avro", {"schema_uri"=>"/opt/ELK/logstash-1.5.4/bin/user.avsc"}]:Array>, :backtrace=>["/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error}
终于找到错误了。而不是这个(如 Logstash 网站上的建议 - https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html)
codec => {
avro => {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
}
正确的语法是(如插件文档 https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md 中所建议):
codec => avro {
schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc"
}
我想语法已经改变了。