如何检索输出大小和从 Spark UI 写入的记录等指标?
How to retrieve Metrics like Output Size and Records Written from Spark UI?
如何在任务或作业完成后立即在控制台(Spark Shell 或 Spark 提交作业)上收集这些指标。
我们正在使用 Spark 将数据从 Mysql 加载到 Cassandra,它非常庞大(例如:~200 GB 和 6 亿行)。任务完成后,我们想验证 spark 处理了多少行?我们可以从 Spark UI 中获取数字,但是我们如何从 spark shell 或 spark-submit job.
中检索该数字 ("Output Records Written")
从 Mysql 加载到 Cassandra 的示例命令。
val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()
pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))
我想检索上述任务的所有 Spark UI 指标,主要是输出大小和写入的记录。
请帮忙。
感谢您的宝贵时间!
找到答案了。您可以使用 SparkListener 获取统计信息。
如果您的作业没有输入或输出指标,您可能会遇到 None.get 异常,您可以通过提供 if stmt.
安全地忽略这些异常
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
if(metrics.inputMetrics != None){
inputRecords += metrics.inputMetrics.get.recordsRead}
if(metrics.outputMetrics != None){
outputWritten += metrics.outputMetrics.get.recordsWritten }
}
})
请找到下面的例子。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
val conf = new SparkConf()
.set("spark.cassandra.connection.host", "...")
.set("spark.driver.allowMultipleContexts","true")
.set("spark.master","spark://....:7077")
.set("spark.driver.memory","1g")
.set("spark.executor.memory","10g")
.set("spark.shuffle.spill","true")
.set("spark.shuffle.memoryFraction","0.2")
.setAppName("CassandraTest")
sc.stop
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
var outputWritten = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
if(metrics.inputMetrics != None){
inputRecords += metrics.inputMetrics.get.recordsRead}
if(metrics.outputMetrics != None){
outputWritten += metrics.outputMetrics.get.recordsWritten }
}
})
val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load()
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "bucks_payments", "keyspace" -> "test"))
println("outputWritten",outputWritten)
结果:
scala> println("outputWritten",outputWritten)
(outputWritten,16383)
如何在任务或作业完成后立即在控制台(Spark Shell 或 Spark 提交作业)上收集这些指标。
我们正在使用 Spark 将数据从 Mysql 加载到 Cassandra,它非常庞大(例如:~200 GB 和 6 亿行)。任务完成后,我们想验证 spark 处理了多少行?我们可以从 Spark UI 中获取数字,但是我们如何从 spark shell 或 spark-submit job.
中检索该数字 ("Output Records Written")从 Mysql 加载到 Cassandra 的示例命令。
val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()
pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))
我想检索上述任务的所有 Spark UI 指标,主要是输出大小和写入的记录。
请帮忙。
感谢您的宝贵时间!
找到答案了。您可以使用 SparkListener 获取统计信息。
如果您的作业没有输入或输出指标,您可能会遇到 None.get 异常,您可以通过提供 if stmt.
安全地忽略这些异常sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
if(metrics.inputMetrics != None){
inputRecords += metrics.inputMetrics.get.recordsRead}
if(metrics.outputMetrics != None){
outputWritten += metrics.outputMetrics.get.recordsWritten }
}
})
请找到下面的例子。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
val conf = new SparkConf()
.set("spark.cassandra.connection.host", "...")
.set("spark.driver.allowMultipleContexts","true")
.set("spark.master","spark://....:7077")
.set("spark.driver.memory","1g")
.set("spark.executor.memory","10g")
.set("spark.shuffle.spill","true")
.set("spark.shuffle.memoryFraction","0.2")
.setAppName("CassandraTest")
sc.stop
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
var outputWritten = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
if(metrics.inputMetrics != None){
inputRecords += metrics.inputMetrics.get.recordsRead}
if(metrics.outputMetrics != None){
outputWritten += metrics.outputMetrics.get.recordsWritten }
}
})
val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load()
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "bucks_payments", "keyspace" -> "test"))
println("outputWritten",outputWritten)
结果:
scala> println("outputWritten",outputWritten)
(outputWritten,16383)