Lift-Json 从 JSON 对象中提取
Lift-Json Extracting from JSON object
我有以下代码:
object Test {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Spark").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(3))
val kafkaBrokers = Map("metadata.broker.list" -> "HostName:9092")
val offsetMap = Map(TopicAndPartition("topic_test", 0), 8)
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaBrokers, offsetMap)
var offsetArray = Array[OffsetRange]()
lines.transform {rdd =>
offsetArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
_.message()
}.foreachRDD {rdd =>
/* NEW CODE */
}
ssc.start()
ssc.awaitTermination()
}
}
我已经在评论 /* NEW CODE */
下添加了新代码。我的问题是 val
行将包含一系列 RDD,这些 RDD 基本上每 3 秒形成一次 kafka 服务器。然后我使用地图功能抓取消息。
但是我对 foreachRDD
函数的作用有点困惑。这是否会遍历 lines DStream
中的所有 RDD's
(我正在尝试这样做)?问题是 lift-json
库中的解析函数只接受一个字符串,所以我需要遍历所有 rdd 并将该字符串值传递给解析函数,这正是我试图做的。 但是由于某种原因没有打印出来。
如果您想从特定偏移量读取数据,则说明您使用了错误的重载。
你需要的是这个:
createDirectStream[K,
V,
KD <: Decoder[K],
VD <: Decoder[V], R]
(ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: (MessageAndMetadata[K, V]) ⇒ R): InputDStream[R]
你需要 Map[TopicAndPartition, Long]
:
val offsetMap = Map(TopicAndPartition("topic_test", 0), 8L)
并且您需要传递一个接收 MessageAndMetadata[K, V]
和 returns 所需类型的函数,例如:
val extractKeyValue: MessageAndMetadata[String, String] => (String, String) =
msgAndMeta => (msgAndMeta.key(), msgAndMeta.message())
并使用它:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
(ssc, kafkaBrokers, offsetMap, extractKeyValue)
我有以下代码:
object Test {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Spark").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(3))
val kafkaBrokers = Map("metadata.broker.list" -> "HostName:9092")
val offsetMap = Map(TopicAndPartition("topic_test", 0), 8)
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaBrokers, offsetMap)
var offsetArray = Array[OffsetRange]()
lines.transform {rdd =>
offsetArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
_.message()
}.foreachRDD {rdd =>
/* NEW CODE */
}
ssc.start()
ssc.awaitTermination()
}
}
我已经在评论 /* NEW CODE */
下添加了新代码。我的问题是 val
行将包含一系列 RDD,这些 RDD 基本上每 3 秒形成一次 kafka 服务器。然后我使用地图功能抓取消息。
但是我对 foreachRDD
函数的作用有点困惑。这是否会遍历 lines DStream
中的所有 RDD's
(我正在尝试这样做)?问题是 lift-json
库中的解析函数只接受一个字符串,所以我需要遍历所有 rdd 并将该字符串值传递给解析函数,这正是我试图做的。 但是由于某种原因没有打印出来。
如果您想从特定偏移量读取数据,则说明您使用了错误的重载。
你需要的是这个:
createDirectStream[K,
V,
KD <: Decoder[K],
VD <: Decoder[V], R]
(ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: (MessageAndMetadata[K, V]) ⇒ R): InputDStream[R]
你需要 Map[TopicAndPartition, Long]
:
val offsetMap = Map(TopicAndPartition("topic_test", 0), 8L)
并且您需要传递一个接收 MessageAndMetadata[K, V]
和 returns 所需类型的函数,例如:
val extractKeyValue: MessageAndMetadata[String, String] => (String, String) =
msgAndMeta => (msgAndMeta.key(), msgAndMeta.message())
并使用它:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
(ssc, kafkaBrokers, offsetMap, extractKeyValue)