KafkaUtils API |抵消管理 |火花流
KafkaUtils API | offset management | Spark Streaming
我正在尝试为 exactly once 语义管理 kafka 偏移量。
使用偏移贴图创建直接流时遇到如下问题:
val fromOffsets : (TopicAndPartition, Long) = TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)] (ssc,kafkaParams,fromOffsets,messageHandler)
这里,
val messageHandler =
(mmd: MessageAndMetadata[String, String]) => mmd.message.length
和
metrics_rs = metricsStatement.executeQuery("SELECT part,off from metrics.txn_offsets where topic='"+t+''' )
我想我在声明样式上做错了...如果你能帮忙的话。
编译错误说 "too many type arguments for createDirectStream"
我发现你做错了几件事。
您需要通过一个 Map[TopicAndPartition, Long]
,而目前您有一个 Tuple2[TopicAndPartition, Long]
。所以你需要:
val fromOffsets: Map[TopicAndPartition, Long] =
Map(TopicAndPartition(metrics_rs.getString(1),
metrics_rs.getInt(2)) -> metrics_rs.getLong(3))
你说 createDirectStream
中的 return 类型是 (String, String)
类型的元组,但你的 messageHandler
值是 Int
。如果你想return一个键值对的元组,你需要:
val messageHandler: MessageAndMetadata[String, String] => (String, String) =
(mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
修复后,应该可以编译:
val stream = KafkaUtils
.createDirectStream[String, String,
StringDecoder, StringDecoder,
(String, String)] (ssc,
kafkaParams,
fromOffsets,
messageHandler)
我正在尝试为 exactly once 语义管理 kafka 偏移量。
使用偏移贴图创建直接流时遇到如下问题:
val fromOffsets : (TopicAndPartition, Long) = TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)] (ssc,kafkaParams,fromOffsets,messageHandler)
这里,
val messageHandler =
(mmd: MessageAndMetadata[String, String]) => mmd.message.length
和
metrics_rs = metricsStatement.executeQuery("SELECT part,off from metrics.txn_offsets where topic='"+t+''' )
我想我在声明样式上做错了...如果你能帮忙的话。 编译错误说 "too many type arguments for createDirectStream"
我发现你做错了几件事。
您需要通过一个 Map[TopicAndPartition, Long]
,而目前您有一个 Tuple2[TopicAndPartition, Long]
。所以你需要:
val fromOffsets: Map[TopicAndPartition, Long] =
Map(TopicAndPartition(metrics_rs.getString(1),
metrics_rs.getInt(2)) -> metrics_rs.getLong(3))
你说 createDirectStream
中的 return 类型是 (String, String)
类型的元组,但你的 messageHandler
值是 Int
。如果你想return一个键值对的元组,你需要:
val messageHandler: MessageAndMetadata[String, String] => (String, String) =
(mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
修复后,应该可以编译:
val stream = KafkaUtils
.createDirectStream[String, String,
StringDecoder, StringDecoder,
(String, String)] (ssc,
kafkaParams,
fromOffsets,
messageHandler)