如何在启动Spark Streaming进程时加载历史数据,并计算运行个聚合
How to load history data when starting Spark Streaming process, and calculate running aggregations
我的 ElasticSearch 集群中有一些与销售相关的 JSON 数据,我想使用 Spark Streaming(使用 Spark 1.4.1)通过 Kafka 动态聚合来自我的电子商务网站的传入销售事件,了解用户的总销售额(在收入和产品方面)。
从我阅读的文档中我不太清楚的是我如何在 Spark 应用程序启动时从 ElasticSearch 加载历史数据,并计算例如每个用户的总收入(基于历史,以及来自 Kafka 的销售收入)。
我有以下(工作)代码连接到我的 Kafka 实例并接收 JSON 文档:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
object ReadFromKafka {
def main(args: Array[String]) {
val checkpointDirectory = "/tmp"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
val topicsSet = Array("tracking").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
// Create direct kafka stream with brokers and topics
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Create SQLContect and parse JSON
val sqlContext = new SQLContext(sc)
val trackingEvents = sqlContext.read.json(rdd.values)
//Sample aggregation of incoming data
trackingEvents.groupBy("type").count().show()
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
我知道有一个ElasticSearch的插件(https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read),但是我不太清楚如何整合启动时的读取,以及流式计算过程来聚合历史数据和流式数据.
非常感谢您的帮助!提前致谢。
RDD 是不可变的,因此在创建后您无法向其添加数据,例如使用新事件更新收入。
你可以做的是将现有数据与新事件结合起来创建一个新的 RDD,然后你可以将其用作当前总数。例如...
var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch
messages.foreachRDD { rdd =>
currentTotal = currentTotal.union(rdd)
}
在这种情况下,我们将 currentTotal
设为 var
,因为当它与传入数据联合时,它将被对新 RDD 的引用所取代。
合并后您可能想要执行一些进一步的操作,例如减少属于同一键的值,但您明白了。
如果您使用此技术,请注意您的 RDD 的沿袭将会增长,因为每个新创建的 RDD 都将引用其父级。这可能会导致堆栈溢出样式沿袭问题。要解决此问题,您可以定期在 RDD 上调用 checkpoint()
。
我的 ElasticSearch 集群中有一些与销售相关的 JSON 数据,我想使用 Spark Streaming(使用 Spark 1.4.1)通过 Kafka 动态聚合来自我的电子商务网站的传入销售事件,了解用户的总销售额(在收入和产品方面)。
从我阅读的文档中我不太清楚的是我如何在 Spark 应用程序启动时从 ElasticSearch 加载历史数据,并计算例如每个用户的总收入(基于历史,以及来自 Kafka 的销售收入)。
我有以下(工作)代码连接到我的 Kafka 实例并接收 JSON 文档:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
object ReadFromKafka {
def main(args: Array[String]) {
val checkpointDirectory = "/tmp"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
val topicsSet = Array("tracking").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
// Create direct kafka stream with brokers and topics
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Create SQLContect and parse JSON
val sqlContext = new SQLContext(sc)
val trackingEvents = sqlContext.read.json(rdd.values)
//Sample aggregation of incoming data
trackingEvents.groupBy("type").count().show()
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
我知道有一个ElasticSearch的插件(https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read),但是我不太清楚如何整合启动时的读取,以及流式计算过程来聚合历史数据和流式数据.
非常感谢您的帮助!提前致谢。
RDD 是不可变的,因此在创建后您无法向其添加数据,例如使用新事件更新收入。
你可以做的是将现有数据与新事件结合起来创建一个新的 RDD,然后你可以将其用作当前总数。例如...
var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch
messages.foreachRDD { rdd =>
currentTotal = currentTotal.union(rdd)
}
在这种情况下,我们将 currentTotal
设为 var
,因为当它与传入数据联合时,它将被对新 RDD 的引用所取代。
合并后您可能想要执行一些进一步的操作,例如减少属于同一键的值,但您明白了。
如果您使用此技术,请注意您的 RDD 的沿袭将会增长,因为每个新创建的 RDD 都将引用其父级。这可能会导致堆栈溢出样式沿袭问题。要解决此问题,您可以定期在 RDD 上调用 checkpoint()
。