通过 Apache Camel 使用来自 Kafka 的 Avro 消息
Consume Avro message from Kafka via Apache Camel
我有一个 Apache Camel 路由将 AVRO 消息发布到 Apache Kafka 主题上。我只是在设置生产者 属性 'serializerClass=kafka.serializer.StringEncoder' 时才让它起作用。否则我得到
java.lang.ClassCastException: java.lang.String cannot be cast to [B
at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize.apply(DefaultEventHandler.scala:130)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize.apply(DefaultEventHandler.scala:125)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52)
at kafka.producer.Producer.send(Producer.scala:77) at
kafka.javaapi.producer.Producer.send(Producer.scala:33) at
org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:84)
在另一端,我有第二个 Apache Camel 路由应该从上面的主题中使用,但失败了
java.io.IOException: Invalid long encoding at
org.apache.avro.io.BinaryDecoder.innerLongDecode(BinaryDecoder.java:217)
at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:176)
at
org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at
org.apache.camel.dataformat.avro.AvroDataFormat.unmarshal(AvroDataFormat.java:133)
at
org.apache.camel.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:67)
这是我使用的 Apache Camel 消费者代码:
<route id="cassandra.publisher">
<from
uri="{{kafka.base.uri}}&topic=sensordata&groupId=Cassandra_ConsumerGroup&consumerId=CassandraConsumer_Instance_1&clientId=adapter2" />
<unmarshal>
<custom ref="avroSensorData" />
</unmarshal>
http://camel.465427.n5.nabble.com/Camel-Kafka-Component-td5749525.html#a5769561
描述 Apache Camel 版本 2.16.0/2.15.3 将支持各种数据类型,而不仅仅是字符串消息。
正如所承诺的那样,此问题已通过 Apache Camel 2.15.3 和 CAMEL-8790 (https://issues.apache.org/jira/browse/CAMEL-8790) 得到修复。
为了解决这个问题,你必须为camel kafka消费者提供keyDeserializer和valueDeserializer,如下所示:
&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&valueDeserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
我有一个 Apache Camel 路由将 AVRO 消息发布到 Apache Kafka 主题上。我只是在设置生产者 属性 'serializerClass=kafka.serializer.StringEncoder' 时才让它起作用。否则我得到
java.lang.ClassCastException: java.lang.String cannot be cast to [B at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) at kafka.producer.async.DefaultEventHandler$$anonfun$serialize.apply(DefaultEventHandler.scala:130) at kafka.producer.async.DefaultEventHandler$$anonfun$serialize.apply(DefaultEventHandler.scala:125) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52) at kafka.producer.Producer.send(Producer.scala:77) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:84)
在另一端,我有第二个 Apache Camel 路由应该从上面的主题中使用,但失败了
java.io.IOException: Invalid long encoding at org.apache.avro.io.BinaryDecoder.innerLongDecode(BinaryDecoder.java:217) at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:176) at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at org.apache.camel.dataformat.avro.AvroDataFormat.unmarshal(AvroDataFormat.java:133) at org.apache.camel.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:67)
这是我使用的 Apache Camel 消费者代码:
<route id="cassandra.publisher">
<from
uri="{{kafka.base.uri}}&topic=sensordata&groupId=Cassandra_ConsumerGroup&consumerId=CassandraConsumer_Instance_1&clientId=adapter2" />
<unmarshal>
<custom ref="avroSensorData" />
</unmarshal>
http://camel.465427.n5.nabble.com/Camel-Kafka-Component-td5749525.html#a5769561
描述 Apache Camel 版本 2.16.0/2.15.3 将支持各种数据类型,而不仅仅是字符串消息。
正如所承诺的那样,此问题已通过 Apache Camel 2.15.3 和 CAMEL-8790 (https://issues.apache.org/jira/browse/CAMEL-8790) 得到修复。
为了解决这个问题,你必须为camel kafka消费者提供keyDeserializer和valueDeserializer,如下所示:
&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&valueDeserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer