有没有办法在 Kafka 消费者中指定多个解码器(或每个主题一个)?还有其他人觉得需要这个吗?

Is there a way to specify multiple Decoders (or one per Topic) in a Kafka Consumer? Anyone else felt need for this?

我正在使用

在 Scala (ref) 中通过 Kafka 工作进行 Spark Streaming
public static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> ReceiverInputDStream<scala.Tuple2<K,V>> createStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel, scala.reflect.ClassTag<K> evidence, scala.reflect.ClassTag<V> evidence, scala.reflect.ClassTag<U> evidence, scala.reflect.ClassTag<T> evidence)

我想在相同的 DStream 和底层 RDD 每个批次间隔中接收不同类型的消息(需要不同的解码器)。我将收听多个主题,每个主题将对应一种消息类型,因此需要自己的 Decoder。目前似乎没有办法为每个主题提供 kafka.serializer.Decoder<?>(有吗?)。人们似乎很可能会针对每个主题发送不同类型的消息(protobuf 序列化字节?)。还有其他人 运行 关注这个问题吗?

谢谢。

C.

似乎 topichere 中某处的 valueDecoder 的映射可能会有所帮助。

我认为,您需要两个 DStream,每个主题一个。然后您将能够执行 join 或 union 以获得包含所有元素的单个 dstream。

使用 createDirectStream api,它使您可以通过 HasOffsetRanges 在每个分区的基础上访问主题。对于 kafka 解码器,使用 DefaultDecoder 获取每条消息的字节数组。

然后在 mapPartitions 中进行实际解码,在其中匹配主题名称以确定如何解释字节数组。

http://spark.apache.org/docs/latest/streaming-kafka-integration.html