火花流吞吐量监控
spark streaming throughput monitoring
有没有办法监控 Spark 集群的输入和输出吞吐量,以确保集群不会被传入数据淹没和溢出?
就我而言,我在 AWS EC2 上设置了 Spark 集群,所以我正在考虑使用 AWS CloudWatch 来监控 NetworkIn 和 NetworkOut 用于集群中的每个节点。
但是我的想法好像不准确,网络不是指只为Spark传入数据,可能还会计算一些其他数据。
有没有专门监控Spark集群流数据状态的工具或方法?或者 Spark 中已经有一个我错过的内置工具?
更新:Spark 1.4发布,4040端口监控显着增强,图形显示
Spark 有 configurable metric subsystem。
默认情况下,它会在 <driver>:<port>/metrics/json
上发布已注册指标的 JSON 版本。可以配置其他指标同步,如神经节、csv 文件或 JMX。
您将需要一些外部监控系统来定期收集指标并帮助您理解这些指标。 (n.b。我们使用 Ganglia,但还有其他开源和商业选项)
Spark Streaming 发布了几个可用于监控作业性能的指标。要计算吞吐量,您可以组合:
(lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records
对于支持的所有指标,请查看 StreamingSource
示例:使用 Spark 1.3.1 启动本地 REPL 并在执行简单的流应用程序后:
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11)
val q = queue.map(elem => sc.parallelize(Seq(elem)))
val dstream = ssc.queueStream(q)
dstream.print
ssc.start
一个人可以得到 localhost:4040/metrics/json
并且 returns:
{
version: "3.0.0",
gauges: {
local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: {
value: 2120
},
local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: {
value: 2120
},
local-1430558777965.<driver>.DAGScheduler.job.activeJobs: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.job.allJobs: {
value: 6
},
local-1430558777965.<driver>.DAGScheduler.stage.failedStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.runningStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: {
value: 0
}
},
counters: { },
histograms: { },
meters: { },
timers: { }
}
我推荐使用 https://spark.apache.org/docs/latest/monitoring.html#metrics with Prometheus (https://prometheus.io/).
可以使用 Prometheus 捕获由 Spark 指标生成的指标,它也提供 UI。 Prometheus 是一款免费工具。
有没有办法监控 Spark 集群的输入和输出吞吐量,以确保集群不会被传入数据淹没和溢出?
就我而言,我在 AWS EC2 上设置了 Spark 集群,所以我正在考虑使用 AWS CloudWatch 来监控 NetworkIn 和 NetworkOut 用于集群中的每个节点。
但是我的想法好像不准确,网络不是指只为Spark传入数据,可能还会计算一些其他数据。
有没有专门监控Spark集群流数据状态的工具或方法?或者 Spark 中已经有一个我错过的内置工具?
更新:Spark 1.4发布,4040端口监控显着增强,图形显示
Spark 有 configurable metric subsystem。
默认情况下,它会在 <driver>:<port>/metrics/json
上发布已注册指标的 JSON 版本。可以配置其他指标同步,如神经节、csv 文件或 JMX。
您将需要一些外部监控系统来定期收集指标并帮助您理解这些指标。 (n.b。我们使用 Ganglia,但还有其他开源和商业选项)
Spark Streaming 发布了几个可用于监控作业性能的指标。要计算吞吐量,您可以组合:
(lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records
对于支持的所有指标,请查看 StreamingSource
示例:使用 Spark 1.3.1 启动本地 REPL 并在执行简单的流应用程序后:
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11)
val q = queue.map(elem => sc.parallelize(Seq(elem)))
val dstream = ssc.queueStream(q)
dstream.print
ssc.start
一个人可以得到 localhost:4040/metrics/json
并且 returns:
{
version: "3.0.0",
gauges: {
local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: {
value: 2120
},
local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: {
value: 2120
},
local-1430558777965.<driver>.DAGScheduler.job.activeJobs: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.job.allJobs: {
value: 6
},
local-1430558777965.<driver>.DAGScheduler.stage.failedStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.runningStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: {
value: 0
}
},
counters: { },
histograms: { },
meters: { },
timers: { }
}
我推荐使用 https://spark.apache.org/docs/latest/monitoring.html#metrics with Prometheus (https://prometheus.io/).
可以使用 Prometheus 捕获由 Spark 指标生成的指标,它也提供 UI。 Prometheus 是一款免费工具。