KafkaUtils API |抵消管理 |火花流

KafkaUtils API | offset management | Spark Streaming

我正在尝试为 exactly once 语义管理 kafka 偏移量。

使用偏移贴图创建直接流时遇到如下问题:

val fromOffsets : (TopicAndPartition, Long) = TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3)

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)] (ssc,kafkaParams,fromOffsets,messageHandler)

这里,

val messageHandler =
      (mmd: MessageAndMetadata[String, String]) => mmd.message.length

metrics_rs = metricsStatement.executeQuery("SELECT part,off from metrics.txn_offsets where topic='"+t+''' )

我想我在声明样式上做错了...如果你能帮忙的话。 编译错误说 "too many type arguments for createDirectStream"

我发现你做错了几件事。

您需要通过一个 Map[TopicAndPartition, Long],而目前您有一个 Tuple2[TopicAndPartition, Long]。所以你需要:

val fromOffsets: Map[TopicAndPartition, Long] = 
    Map(TopicAndPartition(metrics_rs.getString(1), 
                          metrics_rs.getInt(2)) -> metrics_rs.getLong(3))

你说 createDirectStream 中的 return 类型是 (String, String) 类型的元组,但你的 messageHandler 值是 Int。如果你想return一个键值对的元组,你需要:

val messageHandler: MessageAndMetadata[String, String] => (String, String) =
  (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())

修复后,应该可以编译:

val stream = KafkaUtils
              .createDirectStream[String, String,
                      StringDecoder, StringDecoder,
                      (String, String)] (ssc, 
                                         kafkaParams, 
                                         fromOffsets, 
                                         messageHandler)