如何检查结构化流中的 StreamingQuery 性能指标?
How to check StreamingQuery performance metrics in Structured Streaming?
我想从流式查询中获取类似 triggerExecution, inputRowsPerSecond, numInputRows, processedRowsPerSecond
的信息。
我正在使用 rate
格式生成 10 rows per second
,并使用 QueryProgressEvent
获取所有指标。
但是,在控制台中,在打印 QueryProgressEvent.inputRowsPerSecond
时,我得到的值不正确,例如:625.0
666.66
有人可以解释为什么它会产生这样的价值吗?
COde 和示例输出如下:
spark.streams.addListener(new EventMetric())
val df = spark.readStream
.format("rate")
.option("rowsPerSecond",10)
.option("numPartitions",1)
.load()
.select($"value",$"timestamp")
df.writeStream
.outputMode("append")
.option("checkpointLocation", "/testjob")
.foreachBatch((batchDf: DataFrame, batchId: Long) =>{
println("rowcount value >>>> " + rowCountAcc.value)
val outputDf = batchDf
outputDf.write
.format("console")
.mode("append")
.save()
})
.start()
.awaitTermination()
StreamingQueryListener:
class EventMetric extends StreamingQueryListener{
override def onQueryStarted(event: QueryStartedEvent): Unit = {
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
val p = event.progress
// println("id : " + p.id)
println("runId : " + p.runId)
// println("name : " + p.name)
println("batchid : " + p.batchId)
println("timestamp : " + p.timestamp)
println("triggerExecution" + p.durationMs.get("triggerExecution"))
println(p.eventTime)
println("inputRowsPerSecond : " + p.inputRowsPerSecond)
println("numInputRows : " + p.numInputRows)
println("processedRowsPerSecond : " + p.processedRowsPerSecond)
println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
}
}
输出 1:
runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 164
timestamp : 2020-12-12T12:31:14.323Z
triggerExecution453
{}
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 22.075055187637968
输出 2:
runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 168
timestamp : 2020-12-12T12:31:18.326Z
triggerExecution453
{}
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 22.075055187637968
编辑:
另外,如果625是输入速率,那么为什么这个实际上没有做转换的作业的processedRowsPerSecond这么低?
更新 :: 漂亮的输出 JSON:
第 1 批:
runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 198
timestamp : 2020-12-13T16:23:14.331Z
triggerExecution422
{}
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 23.696682464454977
json : {
"id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
"runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
"name" : null,
"timestamp" : "2020-12-13T16:23:14.331Z",
"batchId" : 198,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 23.696682464454977,
"durationMs" : {
"addBatch" : 47,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 0,
"setOffsetRange" : 0,
"triggerExecution" : 422,
"walCommit" : 234
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
"startOffset" : 212599,
"endOffset" : 212600,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 23.696682464454977
} ],
"sink" : {
"description" : "ForeachBatchSink"
}
}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
第 2 批:
runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 191
timestamp : 2020-12-13T16:23:07.328Z
triggerExecution421
{}
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 23.752969121140143
json : {
"id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
"runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
"name" : null,
"timestamp" : "2020-12-13T16:23:07.328Z",
"batchId" : 191,
"numInputRows" : 10,
"inputRowsPerSecond" : 625.0,
"processedRowsPerSecond" : 23.752969121140143,
"durationMs" : {
"addBatch" : 62,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 16,
"setOffsetRange" : 0,
"triggerExecution" : 421,
"walCommit" : 187
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
"startOffset" : 212592,
"endOffset" : 212593,
"numInputRows" : 10,
"inputRowsPerSecond" : 625.0,
"processedRowsPerSecond" : 23.752969121140143
} ],
"sink" : {
"description" : "ForeachBatchSink"
}
}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
请记住,每秒生成 10 行并没有说明整个流式查询中的输入速率。
在您的 writeStream
调用中您没有设置 Trigger
这意味着流式查询在完成并且有新数据可用时被触发。
现在,流式查询显然不需要整秒来读取这 10 秒,而是需要一小部分。 “inputRowsPerSecond”更多是衡量读取输入数据的速度。由于舍入问题,您可能会在不同批次中看到不同的值。检查输出中的字段“timestamp”,它不完全是一秒,但通常是 +- 几毫秒。
作业读取数据仅需几毫秒,这可能因批次而略有不同。在批次 164 中,作业耗时 16 毫秒,在批次 168 中,读取 10 条消息耗时 15 毫秒。
Batch 164 => 10 / 0,016sec = 625 messages per second
Batch 168 => 10 / 0,015ses = 666.6667 messages per second
processedRowsPerSecond
是根据triggerExecution
计算的
1000 / triggerExecution x 10msg = 1000 / 421 x 10msg = 23.752969
我想从流式查询中获取类似 triggerExecution, inputRowsPerSecond, numInputRows, processedRowsPerSecond
的信息。
我正在使用 rate
格式生成 10 rows per second
,并使用 QueryProgressEvent
获取所有指标。
但是,在控制台中,在打印 QueryProgressEvent.inputRowsPerSecond
时,我得到的值不正确,例如:625.0
666.66
有人可以解释为什么它会产生这样的价值吗?
COde 和示例输出如下:
spark.streams.addListener(new EventMetric())
val df = spark.readStream
.format("rate")
.option("rowsPerSecond",10)
.option("numPartitions",1)
.load()
.select($"value",$"timestamp")
df.writeStream
.outputMode("append")
.option("checkpointLocation", "/testjob")
.foreachBatch((batchDf: DataFrame, batchId: Long) =>{
println("rowcount value >>>> " + rowCountAcc.value)
val outputDf = batchDf
outputDf.write
.format("console")
.mode("append")
.save()
})
.start()
.awaitTermination()
StreamingQueryListener:
class EventMetric extends StreamingQueryListener{
override def onQueryStarted(event: QueryStartedEvent): Unit = {
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
val p = event.progress
// println("id : " + p.id)
println("runId : " + p.runId)
// println("name : " + p.name)
println("batchid : " + p.batchId)
println("timestamp : " + p.timestamp)
println("triggerExecution" + p.durationMs.get("triggerExecution"))
println(p.eventTime)
println("inputRowsPerSecond : " + p.inputRowsPerSecond)
println("numInputRows : " + p.numInputRows)
println("processedRowsPerSecond : " + p.processedRowsPerSecond)
println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
}
}
输出 1:
runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 164
timestamp : 2020-12-12T12:31:14.323Z
triggerExecution453
{}
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 22.075055187637968
输出 2:
runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 168
timestamp : 2020-12-12T12:31:18.326Z
triggerExecution453
{}
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 22.075055187637968
编辑:
另外,如果625是输入速率,那么为什么这个实际上没有做转换的作业的processedRowsPerSecond这么低?
更新 :: 漂亮的输出 JSON:
第 1 批:
runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 198
timestamp : 2020-12-13T16:23:14.331Z
triggerExecution422
{}
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 23.696682464454977
json : {
"id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
"runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
"name" : null,
"timestamp" : "2020-12-13T16:23:14.331Z",
"batchId" : 198,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 23.696682464454977,
"durationMs" : {
"addBatch" : 47,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 0,
"setOffsetRange" : 0,
"triggerExecution" : 422,
"walCommit" : 234
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
"startOffset" : 212599,
"endOffset" : 212600,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 23.696682464454977
} ],
"sink" : {
"description" : "ForeachBatchSink"
}
}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
第 2 批:
runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 191
timestamp : 2020-12-13T16:23:07.328Z
triggerExecution421
{}
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 23.752969121140143
json : {
"id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
"runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
"name" : null,
"timestamp" : "2020-12-13T16:23:07.328Z",
"batchId" : 191,
"numInputRows" : 10,
"inputRowsPerSecond" : 625.0,
"processedRowsPerSecond" : 23.752969121140143,
"durationMs" : {
"addBatch" : 62,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 16,
"setOffsetRange" : 0,
"triggerExecution" : 421,
"walCommit" : 187
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
"startOffset" : 212592,
"endOffset" : 212593,
"numInputRows" : 10,
"inputRowsPerSecond" : 625.0,
"processedRowsPerSecond" : 23.752969121140143
} ],
"sink" : {
"description" : "ForeachBatchSink"
}
}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
请记住,每秒生成 10 行并没有说明整个流式查询中的输入速率。
在您的 writeStream
调用中您没有设置 Trigger
这意味着流式查询在完成并且有新数据可用时被触发。
现在,流式查询显然不需要整秒来读取这 10 秒,而是需要一小部分。 “inputRowsPerSecond”更多是衡量读取输入数据的速度。由于舍入问题,您可能会在不同批次中看到不同的值。检查输出中的字段“timestamp”,它不完全是一秒,但通常是 +- 几毫秒。
作业读取数据仅需几毫秒,这可能因批次而略有不同。在批次 164 中,作业耗时 16 毫秒,在批次 168 中,读取 10 条消息耗时 15 毫秒。
Batch 164 => 10 / 0,016sec = 625 messages per second
Batch 168 => 10 / 0,015ses = 666.6667 messages per second
processedRowsPerSecond
是根据triggerExecution
计算的
1000 / triggerExecution x 10msg = 1000 / 421 x 10msg = 23.752969