运行 在 "deadlock" 中从 Kafka 进行流式聚合
Running in "deadlock" while doing streaming aggregations from Kafka
几天前我发了一个类似的问题:
我现在至少得到了一个 "working" 解决方案,这意味着该过程本身似乎可以正常工作。但是,由于我是 Spark 的初学者,我似乎错过了一些关于如何以正确的方式构建此类应用程序的东西 (performance-/computational-wise)...
我想做的事情:
在应用程序启动时从 ElasticSearch 加载历史数据
使用 Spark Streaming
在启动时开始收听 Kafka 主题(带有销售事件,作为 JSON 字符串传递)
- 对于每个传入的 RDD,对每个用户进行聚合
- 将 3. 的结果与历史合并
- 汇总每个用户的新值,例如总收入
- 使用 5. 的结果作为下一次迭代的新"history"
我的代码如下:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object ReadFromKafkaAndES {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("kafka").setLevel(Level.WARN)
val checkpointDirectory = "/tmp/Spark"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
conf.set("es.nodes", "localhost")
conf.set("es.port", "9200")
val topicsSet = Array("sales").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(15))
ssc.checkpoint(checkpointDirectory)
//Create SQLContect
val sqlContext = new SQLContext(sc)
//Get history data from ES
var history = sqlContext.esDF("data/salesaggregation")
//Kafka settings
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
// Create direct kafka stream with brokers and topics
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Register temporary table for the aggregated history
history.registerTempTable("history")
println("--- History -------------------------------")
history.show()
//Parse JSON as DataFrame
val saleEvents = sqlContext.read.json(rdd.values)
//Register temporary table for sales events
saleEvents.registerTempTable("sales")
val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")
println("--- Sales ---------------------------------")
sales.show()
val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")
println("--- Aggregation ---------------------------")
agg.show()
//This is our new "history"
history = agg
//Cache results
history.cache()
//Drop temporary table
sqlContext.dropTempTable("history")
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
计算似乎工作正常:
--- History -------------------------------
+--------------------+--------------------+-----------+------------+------+
| latestSaleTimestamp| productList|totalPoints|totalRevenue|userId|
+--------------------+--------------------+-----------+------------+------+
|2015-07-22 10:03:...|Buffer(47, 1484, ...| 91| 12.05| 23|
|2015-07-22 12:50:...|Buffer(256, 384, ...| 41| 7.05| 24|
+--------------------+--------------------+-----------+------------+------+
--- Sales ---------------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 255.59| 208|
| 24|2015-07-29 09:17:...|226.08999999999997| 196|
+------+--------------------+------------------+-----------+
--- Aggregation ---------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 267.6400001907349| 299|
| 24|2015-07-29 09:17:...|233.14000019073484| 237|
+------+--------------------+------------------+-----------+
但是如果应用程序运行多次迭代,我可以看到性能下降:
我还看到大量跳过的任务,每次迭代都会增加:
第一次迭代的图表看起来像
第二次迭代的图表看起来像
经过的迭代次数越多,图表就会越长,跳过的步骤也越多。
基本上,我认为问题在于为下一次迭代存储迭代结果。不幸的是,在尝试了很多不同的东西并阅读了文档之后,我也无法为此提出解决方案。非常感谢任何帮助。谢谢!
此流作业不在 'deadlock' 中,但它的执行时间随着每次迭代呈指数增长,导致流作业迟早会失败。
RDD 上的 union->reduce->union->reduce... 迭代过程创建了不断增加的 RDD 沿袭。每次迭代都会将依赖项添加到需要在下一次迭代中计算的谱系中,这也会导致执行时间增加。依赖关系(沿袭)图清楚地显示了这一点。
一种解决方案是定期检查 RDD。
history.checkpoint()
您还可以探索用 updateStateByKey
替换 union/reduce 流程
几天前我发了一个类似的问题:
我现在至少得到了一个 "working" 解决方案,这意味着该过程本身似乎可以正常工作。但是,由于我是 Spark 的初学者,我似乎错过了一些关于如何以正确的方式构建此类应用程序的东西 (performance-/computational-wise)...
我想做的事情:
在应用程序启动时从 ElasticSearch 加载历史数据
使用 Spark Streaming
在启动时开始收听 Kafka 主题(带有销售事件,作为 JSON 字符串传递)
- 对于每个传入的 RDD,对每个用户进行聚合
- 将 3. 的结果与历史合并
- 汇总每个用户的新值,例如总收入
- 使用 5. 的结果作为下一次迭代的新"history"
我的代码如下:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object ReadFromKafkaAndES {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("kafka").setLevel(Level.WARN)
val checkpointDirectory = "/tmp/Spark"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
conf.set("es.nodes", "localhost")
conf.set("es.port", "9200")
val topicsSet = Array("sales").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(15))
ssc.checkpoint(checkpointDirectory)
//Create SQLContect
val sqlContext = new SQLContext(sc)
//Get history data from ES
var history = sqlContext.esDF("data/salesaggregation")
//Kafka settings
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
// Create direct kafka stream with brokers and topics
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Register temporary table for the aggregated history
history.registerTempTable("history")
println("--- History -------------------------------")
history.show()
//Parse JSON as DataFrame
val saleEvents = sqlContext.read.json(rdd.values)
//Register temporary table for sales events
saleEvents.registerTempTable("sales")
val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")
println("--- Sales ---------------------------------")
sales.show()
val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")
println("--- Aggregation ---------------------------")
agg.show()
//This is our new "history"
history = agg
//Cache results
history.cache()
//Drop temporary table
sqlContext.dropTempTable("history")
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
计算似乎工作正常:
--- History -------------------------------
+--------------------+--------------------+-----------+------------+------+
| latestSaleTimestamp| productList|totalPoints|totalRevenue|userId|
+--------------------+--------------------+-----------+------------+------+
|2015-07-22 10:03:...|Buffer(47, 1484, ...| 91| 12.05| 23|
|2015-07-22 12:50:...|Buffer(256, 384, ...| 41| 7.05| 24|
+--------------------+--------------------+-----------+------------+------+
--- Sales ---------------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 255.59| 208|
| 24|2015-07-29 09:17:...|226.08999999999997| 196|
+------+--------------------+------------------+-----------+
--- Aggregation ---------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 267.6400001907349| 299|
| 24|2015-07-29 09:17:...|233.14000019073484| 237|
+------+--------------------+------------------+-----------+
但是如果应用程序运行多次迭代,我可以看到性能下降:
我还看到大量跳过的任务,每次迭代都会增加:
第一次迭代的图表看起来像
第二次迭代的图表看起来像
经过的迭代次数越多,图表就会越长,跳过的步骤也越多。
基本上,我认为问题在于为下一次迭代存储迭代结果。不幸的是,在尝试了很多不同的东西并阅读了文档之后,我也无法为此提出解决方案。非常感谢任何帮助。谢谢!
此流作业不在 'deadlock' 中,但它的执行时间随着每次迭代呈指数增长,导致流作业迟早会失败。
RDD 上的 union->reduce->union->reduce... 迭代过程创建了不断增加的 RDD 沿袭。每次迭代都会将依赖项添加到需要在下一次迭代中计算的谱系中,这也会导致执行时间增加。依赖关系(沿袭)图清楚地显示了这一点。
一种解决方案是定期检查 RDD。
history.checkpoint()
您还可以探索用 updateStateByKey