通过 Apache Camel 使用来自 Kafka 的 Avro 消息

Consume Avro message from Kafka via Apache Camel

我有一个 Apache Camel 路由将 AVRO 消息发布到 Apache Kafka 主题上。我只是在设置生产者 属性 'serializerClass=kafka.serializer.StringEncoder' 时才让它起作用。否则我得到

java.lang.ClassCastException: java.lang.String cannot be cast to [B at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) at kafka.producer.async.DefaultEventHandler$$anonfun$serialize.apply(DefaultEventHandler.scala:130) at kafka.producer.async.DefaultEventHandler$$anonfun$serialize.apply(DefaultEventHandler.scala:125) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52) at kafka.producer.Producer.send(Producer.scala:77) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:84)

在另一端,我有第二个 Apache Camel 路由应该从上面的主题中使用,但失败了

java.io.IOException: Invalid long encoding at org.apache.avro.io.BinaryDecoder.innerLongDecode(BinaryDecoder.java:217) at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:176) at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at org.apache.camel.dataformat.avro.AvroDataFormat.unmarshal(AvroDataFormat.java:133) at org.apache.camel.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:67)

这是我使用的 Apache Camel 消费者代码:

        <route id="cassandra.publisher">
            <from
                uri="{{kafka.base.uri}}&amp;topic=sensordata&amp;groupId=Cassandra_ConsumerGroup&amp;consumerId=CassandraConsumer_Instance_1&amp;clientId=adapter2" />      
            <unmarshal>
                <custom ref="avroSensorData" />
            </unmarshal>

http://camel.465427.n5.nabble.com/Camel-Kafka-Component-td5749525.html#a5769561

描述 Apache Camel 版本 2.16.0/2.15.3 将支持各种数据类型,而不仅仅是字符串消息。

正如所承诺的那样,此问题已通过 Apache Camel 2.15.3 和 CAMEL-8790 (https://issues.apache.org/jira/browse/CAMEL-8790) 得到修复。

为了解决这个问题,你必须为camel kafka消费者提供keyDeserializer和valueDeserializer,如下所示:

&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&valueDeserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer