Spark 流式自定义指标
Spark streaming custom metrics
我正在开发一个 Spark Streaming 程序,该程序检索 Kafka 流,对流进行非常基本的转换,然后将数据插入数据库(如果相关,则为 voltdb)。
我正在尝试测量将行插入数据库的速率。我认为 metrics can be useful (using JMX). However I can't find how to add custom metrics to Spark. I've looked at Spark's source code and also found this thread 但是它对我不起作用。我还在 conf.metrics 文件中启用了 JMX 接收器。不起作用的是我没有看到我的 JConsole 自定义指标。
有人可以解释如何添加自定义指标(最好通过 JMX)来激发流吗?或者如何测量我对数据库(特别是 VoltDB)的插入率?
我在 Java 8.
中使用 spark
这是一个很棒的教程,其中涵盖了使用 Graphite 设置 Spark Metrics System 所需的所有步骤。这应该可以解决问题:
http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/
好的,在深入研究 source code 之后,我发现了如何添加我自己的自定义指标。它需要 3 件事:
- 创建我自己的自定义 source. Sort of like this
- 在 spark metrics.properties 文件中启用 Jmx 接收器。我使用的具体行是:
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
为所有实例启用 JmxSink
- 在 SparkEnv 指标系统中注册我的自定义源。可以看到如何操作的示例 here - 我之前确实看过这个 link 但错过了注册部分,这使我无法在 JVisualVM
中实际看到我的自定义指标
我仍在为如何实际计算插入 VoltDB 的次数而苦苦挣扎,因为代码在执行程序上运行,但这是另一个主题的主题:)
我希望这对其他人有帮助
要根据 VoltDB 的插入插入行,请使用累加器 - 然后您可以从您的驱动程序创建一个侦听器 - 也许像这样可以帮助您入门
sparkContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
stageCompleted.stageInfo.accumulables.foreach { case (_, acc) => {
在这里您可以访问那些行组合的累加器,然后您可以发送到您的接收器..
Groupon 有一个名为 spark-metrics
that lets you use a simple (Codahale-like) API on your executors and have the results collated back in the driver and automatically registered in Spark's existing metrics registry. These then get automatically exported along with Spark's built-in metrics when you configure a metric sink as per the Spark docs 的库。
下面是 Java 中的一个工作示例。
它使用 StreaminQuery
进行了测试(不幸的是 StreaminQuery
在 Spark 2.3.1 之前没有像 StreamingContext
这样的 ootb 指标)。
步骤:
在Source
class
的同一个包中定义自定义源
package org.apache.spark.metrics.source;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
/**
* Metrics source for structured streaming query.
*/
public class StreamingQuerySource implements Source {
private String appName;
private MetricRegistry metricRegistry = new MetricRegistry();
private final Progress progress = new Progress();
public StreamingQuerySource(String appName) {
this.appName = appName;
registerGuage("batchId", () -> progress.batchId());
registerGuage("numInputRows", () -> progress.numInputRows());
registerGuage("inputRowsPerSecond", () -> progress.inputRowsPerSecond());
registerGuage("processedRowsPerSecond", () -> progress.processedRowsPerSecond());
}
private <T> Gauge<T> registerGuage(String name, Gauge<T> metric) {
return metricRegistry.register(MetricRegistry.name(name), metric);
}
@Override
public String sourceName() {
return String.format("%s.streaming", appName);
}
@Override
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public void updateProgress(StreamingQueryProgress queryProgress) {
progress.batchId(queryProgress.batchId())
.numInputRows(queryProgress.numInputRows())
.inputRowsPerSecond(queryProgress.inputRowsPerSecond())
.processedRowsPerSecond(queryProgress.processedRowsPerSecond());
}
@Data
@Accessors(fluent = true)
private static class Progress {
private long batchId = -1;
private long numInputRows = 0;
private double inputRowsPerSecond = 0;
private double processedRowsPerSecond = 0;
}
}
创建 SparkContext 后立即注册源
querySource = new StreamingQuerySource(getSparkSession().sparkContext().appName());
SparkEnv.get().metricsSystem().registerSource(querySource);
在StreamingQueryListener.onProgress(事件)
中更新数据
querySource.updateProgress(event.progress());
配置metrics.properties
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=xxx
*.sink.graphite.port=9109
*.sink.graphite.period=10
*.sink.graphite.unit=seconds
# Enable jvm source for instance master, worker, driver and executor
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
石墨导出器中的示例输出(映射到普罗米修斯格式)
streaming_query{application="local-1538032184639",model="model1",qty="batchId"} 38
streaming_query{application="local-1538032184639",model="model1r",qty="inputRowsPerSecond"} 2.5
streaming_query{application="local-1538032184639",model="model1",qty="numInputRows"} 5
streaming_query{application="local-1538032184639",model=model1",qty="processedRowsPerSecond"} 0.81
我正在开发一个 Spark Streaming 程序,该程序检索 Kafka 流,对流进行非常基本的转换,然后将数据插入数据库(如果相关,则为 voltdb)。 我正在尝试测量将行插入数据库的速率。我认为 metrics can be useful (using JMX). However I can't find how to add custom metrics to Spark. I've looked at Spark's source code and also found this thread 但是它对我不起作用。我还在 conf.metrics 文件中启用了 JMX 接收器。不起作用的是我没有看到我的 JConsole 自定义指标。
有人可以解释如何添加自定义指标(最好通过 JMX)来激发流吗?或者如何测量我对数据库(特别是 VoltDB)的插入率? 我在 Java 8.
中使用 spark这是一个很棒的教程,其中涵盖了使用 Graphite 设置 Spark Metrics System 所需的所有步骤。这应该可以解决问题:
http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/
好的,在深入研究 source code 之后,我发现了如何添加我自己的自定义指标。它需要 3 件事:
- 创建我自己的自定义 source. Sort of like this
- 在 spark metrics.properties 文件中启用 Jmx 接收器。我使用的具体行是:
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
为所有实例启用 JmxSink - 在 SparkEnv 指标系统中注册我的自定义源。可以看到如何操作的示例 here - 我之前确实看过这个 link 但错过了注册部分,这使我无法在 JVisualVM 中实际看到我的自定义指标
我仍在为如何实际计算插入 VoltDB 的次数而苦苦挣扎,因为代码在执行程序上运行,但这是另一个主题的主题:)
我希望这对其他人有帮助
要根据 VoltDB 的插入插入行,请使用累加器 - 然后您可以从您的驱动程序创建一个侦听器 - 也许像这样可以帮助您入门
sparkContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
stageCompleted.stageInfo.accumulables.foreach { case (_, acc) => {
在这里您可以访问那些行组合的累加器,然后您可以发送到您的接收器..
Groupon 有一个名为 spark-metrics
that lets you use a simple (Codahale-like) API on your executors and have the results collated back in the driver and automatically registered in Spark's existing metrics registry. These then get automatically exported along with Spark's built-in metrics when you configure a metric sink as per the Spark docs 的库。
下面是 Java 中的一个工作示例。
它使用 StreaminQuery
进行了测试(不幸的是 StreaminQuery
在 Spark 2.3.1 之前没有像 StreamingContext
这样的 ootb 指标)。
步骤:
在Source
class
package org.apache.spark.metrics.source;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
/**
* Metrics source for structured streaming query.
*/
public class StreamingQuerySource implements Source {
private String appName;
private MetricRegistry metricRegistry = new MetricRegistry();
private final Progress progress = new Progress();
public StreamingQuerySource(String appName) {
this.appName = appName;
registerGuage("batchId", () -> progress.batchId());
registerGuage("numInputRows", () -> progress.numInputRows());
registerGuage("inputRowsPerSecond", () -> progress.inputRowsPerSecond());
registerGuage("processedRowsPerSecond", () -> progress.processedRowsPerSecond());
}
private <T> Gauge<T> registerGuage(String name, Gauge<T> metric) {
return metricRegistry.register(MetricRegistry.name(name), metric);
}
@Override
public String sourceName() {
return String.format("%s.streaming", appName);
}
@Override
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public void updateProgress(StreamingQueryProgress queryProgress) {
progress.batchId(queryProgress.batchId())
.numInputRows(queryProgress.numInputRows())
.inputRowsPerSecond(queryProgress.inputRowsPerSecond())
.processedRowsPerSecond(queryProgress.processedRowsPerSecond());
}
@Data
@Accessors(fluent = true)
private static class Progress {
private long batchId = -1;
private long numInputRows = 0;
private double inputRowsPerSecond = 0;
private double processedRowsPerSecond = 0;
}
}
创建 SparkContext 后立即注册源
querySource = new StreamingQuerySource(getSparkSession().sparkContext().appName());
SparkEnv.get().metricsSystem().registerSource(querySource);
在StreamingQueryListener.onProgress(事件)
中更新数据 querySource.updateProgress(event.progress());
配置metrics.properties
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=xxx
*.sink.graphite.port=9109
*.sink.graphite.period=10
*.sink.graphite.unit=seconds
# Enable jvm source for instance master, worker, driver and executor
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
石墨导出器中的示例输出(映射到普罗米修斯格式)
streaming_query{application="local-1538032184639",model="model1",qty="batchId"} 38
streaming_query{application="local-1538032184639",model="model1r",qty="inputRowsPerSecond"} 2.5
streaming_query{application="local-1538032184639",model="model1",qty="numInputRows"} 5
streaming_query{application="local-1538032184639",model=model1",qty="processedRowsPerSecond"} 0.81