用于多个测试的 MiniDFS 集群设置 类 抛出 java.net.BindException:地址已在使用中

MiniDFS cluster setup for multiple test classes throws java.net.BindException: Address already in use

我正在为 reads/writes 数据 from/to hdfs 文件和 spark 目录的 spark 代码编写单元测试用例。为此,我创建了一个单独的特征来提供 minidfs 集群的初始化,并且我在创建 SparkSession 对象时使用生成的 hdfs uri 的值 - spark.sql.warehouse.dir。这是它的代码 -

trait TestSparkSession extends BeforeAndAfterAll {
  self: Suite =>

  var hdfsCluster: MiniDFSCluster = _

  def nameNodeURI: String = s"hdfs://localhost:${hdfsCluster.getNameNodePort}/"

  def withLocalSparkSession(tests: SparkSession => Any): Any = {
    val baseDir = new File(PathUtils.getTestDir(getClass), "miniHDFS")
    val conf = new HdfsConfiguration()
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
    val builder = new MiniDFSCluster.Builder(conf)
    hdfsCluster = builder.nameNodePort(9000)
      .manageNameDfsDirs(true)
      .manageDataDfsDirs(true)
      .format(true)
      .build()
    hdfsCluster.waitClusterUp()

    val testSpark = SparkSession
      .builder()
      .master("local")
      .appName("Test App")
      .config("spark.sql.warehouse.dir", s"${nameNodeURI}spark-warehouse/")
      .getOrCreate()
    tests(testSpark)  
  }

  def stopHdfs(): Unit = hdfsCluster.shutdown(true, true)

  override def afterAll(): Unit = stopHdfs()

}

在编写测试时,我继承了这个特性,然后编写测试用例,如 -

class SampleSpec extends FunSuite with TestSparkSession {
     withLocalSparkSession {
         testSpark =>
         import testSpark.implicits._

         // Test 1 Here
         // Test 2 Here
     }
}

当我 运行 我的测试 class 一次一个时,一切正常。但是当 运行 它们同时抛出 java.net.BindException: Address already in use 时。 这应该意味着在执行下一组测试时,已经创建的 hdfsCluster 尚未关闭。这就是为什么它无法创建另一个绑定到同一端口的原因。但随后在 afterAll() 中我停止了 hfdsCluster。

我的问题是我可以共享 hdfs 集群和 spark 会话的单个实例而不是每次都初始化它吗?我试图在方法之外提取初始化,但它仍然抛出相同的异常。即使我无法共享它,我如何才能正确停止我的集群并在下一次测试 class 执行时重新初始化它?

此外,请告诉我我编写 'unit' 使用 SparkSession 和 HDFS 存储的测试用例的方法是否正确。

任何帮助将不胜感激。

我通过在伴随对象中创建 hdfs 集群来解决它,以便它为所有测试服创建它的单个实例。