Apache Beam 管道缺少指标(通过 SparkRunner / Dataproc)

Missing Metrics for Apache Beam Pipeline (via SparkRunner / Dataproc)

我目前正在通过 Spark Runner 向在 Google Dataproc 上运行的现有管道添加一些指标,我正在尝试确定如何访问这些指标并最终将它们公开给Stackdriver(将在 Grafana 仪表板的下游使用)。

指标本身相当简单(一系列计数器)并且是这样定义的(并在整个管道的 DoFns 中访问):

object Metrics {
   val exampleMetric: Counter = Metrics.counter(ExamplePipeline::class.qualifiedName, "count")

   // Others omitted for brevity
}

此指标(和其他指标)在管道的整个过程中在各种 DoFn 调用中递增,并且几个单元测试确认管道中的 MetricQueryResults 对象在通过DirectRunner.

这里的主要问题是,我在 Dataproc 或 GCP 中公开的任何相关 UI(YARN ResourceManager、Spark History Server、YARN Application Timeline 等)中没有看到这些指标是正在发射。我已经尝试通过日志和其他任何地方进行搜索,但我没有看到这些自定义指标的任何迹象(或者实际上任何指标通常从 Spark and/or 发送到 Stackdriver)。

工作配置

Spark 作业本身是通过脚本中的以下命令配置的(假设已将适当的 .jar 文件复制到 GCP 中的适当存储桶中:

gcloud dataproc jobs submit spark --jar $bucket/deployment/example-pipeline.jar \
       --project $project_name \
       --cluster $cluster_name \
       --region $region  \
       --id pipeline-$timestamp \
       --driver-log-levels $lots_of_things_here \
       --properties=spark.dynamicAllocation.enabled=false \
       --labels="type"="example-pipeline","namespace"="$namespace" \
       --async \
       -- \
         --runner=SparkRunner \
         --streaming

集群配置

集群本身似乎启用了我能想到的所有与指标相关的 属性,例如:

dataproc:dataproc.logging.stackdriver.enable=true
dataproc:dataproc.logging.stackdriver.job.driver.enable=true
dataproc:dataproc.monitoring.stackdriver.enable=true
dataproc:spark.submit.deployMode=cluster
spark:spark.eventLog.dir=hdfs:///var/log/spark/apps
spark:spark.eventLog.enabled=true
yarn:yarn.log-aggregation-enable=true
yarn:yarn.log-aggregation.retain-seconds=-1

这些只是集群上的一小部分属性,但是还有无数其他属性,因此如果有一个似乎丢失或不正确(因为它与指标故事有关),请随时询问。

问题

我不得不想象这是一个相当常见的用例(对于 Spark / Dataproc / Beam),但我不确定配置难题的哪些部分丢失了,并且 documentation/articles 与此过程相关看起来很稀疏。

提前致谢!

遗憾的是,截至目前,Dataproc 没有针对 Spark 系统和自定义指标的 StackDriver 集成。

可以通过配置 /etc/spark/conf/metrics.properties(您可以从 /etc/spark/conf/metrics.properties.template 复制)或通过 cluster/job 属性启用 Spark 系统指标。在此 doc 中查看更多信息。充其量,您可以在集群中以 CSV 文件或 HTTP 服务的形式提供这些指标,但尚未与 StackDriver 集成。

对于 Spark 自定义指标,您可能需要实现自己的源代码,例如 ,然后它可以在集群中作为系统指标提供,如上所述。