如何在 Databricks 中配置自定义 Spark 插件?

How to configure a custom Spark Plugin in Databricks?

如何在 Databricks 中正确配置 Spark 插件和包含 Spark 插件的 jar class?

我在 Scala 中创建了以下 Spark 3 插件 class, CustomExecSparkPlugin.scala:

package example

import org.apache.spark.api.plugin.{SparkPlugin, DriverPlugin, ExecutorPlugin}

class CustomExecSparkPlugin extends SparkPlugin  {
 
  override def driverPlugin(): DriverPlugin = {
    new DriverPlugin() {
      override def shutdown(): Unit = {
        // custom code        
      }
    }
  }

  override def executorPlugin(): ExecutorPlugin = {
    new ExecutorPlugin() {
      override def shutdown(): Unit = {
        // custom code  
      }
    }
  }
}

我已经将它打包成一个 jar 并上传到 DBFS,在 DBR 7.3(Spark 3.0.1、Scala 2.12)集群创建期间,我设置了以下 Spark 配置(高级选项):

spark.plugins com.example.CustomExecSparkPlugin
spark.driver.extraClassPath /dbfs/path/to/jar
spark.executor.extraClassPath /dbfs/path/to/jar

但是,集群创建失败并出现异常:com.example.CustomExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@622d7e4

驱动程序 log4j 日志:

21/11/01 13:33:01 ERROR SparkContext: Error initializing SparkContext.
java.lang.ClassNotFoundException: com.example.CustomExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@622d7e4
    at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:115)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:226)
    at org.apache.spark.util.Utils$.$anonfun$loadExtensions(Utils.scala:3006)
    at scala.collection.TraversableLike.$anonfun$flatMap(TraversableLike.scala:245)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3004)
    at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:160)
    at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:146)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.$anonfun$initializeSharedDriverContext(DatabricksILoop.scala:347)
    at com.databricks.backend.daemon.driver.ClassLoaders$.withContextClassLoader(ClassLoaders.scala:29)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.initializeSharedDriverContext(DatabricksILoop.scala:347)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.getOrCreateSharedDriverContext(DatabricksILoop.scala:277)
    at com.databricks.backend.daemon.driver.DriverCorral.com$databricks$backend$daemon$driver$DriverCorral$$driverContext(DriverCorral.scala:179)
    at com.databricks.backend.daemon.driver.DriverCorral.<init>(DriverCorral.scala:216)
    at com.databricks.backend.daemon.driver.DriverDaemon.<init>(DriverDaemon.scala:39)
    at com.databricks.backend.daemon.driver.DriverDaemon$.create(DriverDaemon.scala:211)
    at com.databricks.backend.daemon.driver.DriverDaemon$.wrappedMain(DriverDaemon.scala:216)
    at com.databricks.DatabricksMain.$anonfun$main(DatabricksMain.scala:106)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.DatabricksMain.$anonfun$withStartupProfilingData(DatabricksMain.scala:321)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperation(UsageLogging.scala:431)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext(UsageLogging.scala:239)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
    at com.databricks.DatabricksMain.withAttributionContext(DatabricksMain.scala:74)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
    at com.databricks.DatabricksMain.withAttributionTags(DatabricksMain.scala:74)
    at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
    at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
    at com.databricks.DatabricksMain.recordOperation(DatabricksMain.scala:74)
    at com.databricks.DatabricksMain.withStartupProfilingData(DatabricksMain.scala:321)
    at com.databricks.DatabricksMain.main(DatabricksMain.scala:105)
    at com.databricks.backend.daemon.driver.DriverDaemon.main(DriverDaemon.scala)
Caused by: java.lang.ClassNotFoundException: com.example.CustomExecSparkPlugin
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:112)
    ... 43 more
21/11/01 13:33:02 INFO AbstractConnector: Stopped Spark@b6bccb4{HTTP/1.1,[http/1.1]}{10.88.234.70:40001}
21/11/01 13:33:02 INFO SparkUI: Stopped Spark web UI at http://10.88.234.70:40001
21/11/01 13:33:02 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/11/01 13:33:02 INFO MemoryStore: MemoryStore cleared
21/11/01 13:33:02 INFO BlockManager: BlockManager stopped
21/11/01 13:33:02 INFO BlockManagerMaster: BlockManagerMaster stopped
21/11/01 13:33:02 WARN MetricsSystem: Stopping a MetricsSystem that is not running
21/11/01 13:33:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/11/01 13:33:02 INFO SparkContext: Successfully stopped SparkContext

您可以考虑将其添加为 init script instead。 init 脚本让您有机会在 spark 开始之前将 jar 添加到集群,这可能是 spark 插件所期望的。

  • 将你的 jar 上传到 dbfs,比如 dbfs:/databricks/plugins
  • 创建如下所示的 bash 脚本并将其上传到同一位置。
  • 使用指定的初始化脚本创建/编辑集群。
#!/bin/bash

STAGE_DIR="/dbfs/databricks/plugins/

echo "BEGIN: Upload Spark Plugins"
cp -f $STAGE_DIR/*.jar /mnt/driver-daemon/jars || { echo "Error copying Spark Plugin library file"; exit 1;}
echo "END: Upload Spark Plugin JARs"

echo "BEGIN: Modify Spark config settings"
cat << 'EOF' > /databricks/driver/conf/spark-plugin-driver-defaults.conf
[driver] {
   "spark.plugins" = "com.example.CustomExecSparkPlugin"
}
EOF
echo "END: Modify Spark config settings"

我相信将 jar 复制到 /mnt/driver-daemons/jars 将使 Spark 在 Spark 完全初始化 (doc) 之前知道该 jar。我不太确定它会对执行者产生影响:(