用于多个测试的 MiniDFS 集群设置 类 抛出 java.net.BindException:地址已在使用中
MiniDFS cluster setup for multiple test classes throws java.net.BindException: Address already in use
我正在为 reads/writes 数据 from/to hdfs 文件和 spark 目录的 spark 代码编写单元测试用例。为此,我创建了一个单独的特征来提供 minidfs 集群的初始化,并且我在创建 SparkSession 对象时使用生成的 hdfs uri 的值 - spark.sql.warehouse.dir
。这是它的代码 -
trait TestSparkSession extends BeforeAndAfterAll {
self: Suite =>
var hdfsCluster: MiniDFSCluster = _
def nameNodeURI: String = s"hdfs://localhost:${hdfsCluster.getNameNodePort}/"
def withLocalSparkSession(tests: SparkSession => Any): Any = {
val baseDir = new File(PathUtils.getTestDir(getClass), "miniHDFS")
val conf = new HdfsConfiguration()
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
val builder = new MiniDFSCluster.Builder(conf)
hdfsCluster = builder.nameNodePort(9000)
.manageNameDfsDirs(true)
.manageDataDfsDirs(true)
.format(true)
.build()
hdfsCluster.waitClusterUp()
val testSpark = SparkSession
.builder()
.master("local")
.appName("Test App")
.config("spark.sql.warehouse.dir", s"${nameNodeURI}spark-warehouse/")
.getOrCreate()
tests(testSpark)
}
def stopHdfs(): Unit = hdfsCluster.shutdown(true, true)
override def afterAll(): Unit = stopHdfs()
}
在编写测试时,我继承了这个特性,然后编写测试用例,如 -
class SampleSpec extends FunSuite with TestSparkSession {
withLocalSparkSession {
testSpark =>
import testSpark.implicits._
// Test 1 Here
// Test 2 Here
}
}
当我 运行 我的测试 class 一次一个时,一切正常。但是当 运行 它们同时抛出 java.net.BindException: Address already in use
时。
这应该意味着在执行下一组测试时,已经创建的 hdfsCluster 尚未关闭。这就是为什么它无法创建另一个绑定到同一端口的原因。但随后在 afterAll() 中我停止了 hfdsCluster。
我的问题是我可以共享 hdfs 集群和 spark 会话的单个实例而不是每次都初始化它吗?我试图在方法之外提取初始化,但它仍然抛出相同的异常。即使我无法共享它,我如何才能正确停止我的集群并在下一次测试 class 执行时重新初始化它?
此外,请告诉我我编写 'unit' 使用 SparkSession 和 HDFS 存储的测试用例的方法是否正确。
任何帮助将不胜感激。
我通过在伴随对象中创建 hdfs 集群来解决它,以便它为所有测试服创建它的单个实例。
我正在为 reads/writes 数据 from/to hdfs 文件和 spark 目录的 spark 代码编写单元测试用例。为此,我创建了一个单独的特征来提供 minidfs 集群的初始化,并且我在创建 SparkSession 对象时使用生成的 hdfs uri 的值 - spark.sql.warehouse.dir
。这是它的代码 -
trait TestSparkSession extends BeforeAndAfterAll {
self: Suite =>
var hdfsCluster: MiniDFSCluster = _
def nameNodeURI: String = s"hdfs://localhost:${hdfsCluster.getNameNodePort}/"
def withLocalSparkSession(tests: SparkSession => Any): Any = {
val baseDir = new File(PathUtils.getTestDir(getClass), "miniHDFS")
val conf = new HdfsConfiguration()
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
val builder = new MiniDFSCluster.Builder(conf)
hdfsCluster = builder.nameNodePort(9000)
.manageNameDfsDirs(true)
.manageDataDfsDirs(true)
.format(true)
.build()
hdfsCluster.waitClusterUp()
val testSpark = SparkSession
.builder()
.master("local")
.appName("Test App")
.config("spark.sql.warehouse.dir", s"${nameNodeURI}spark-warehouse/")
.getOrCreate()
tests(testSpark)
}
def stopHdfs(): Unit = hdfsCluster.shutdown(true, true)
override def afterAll(): Unit = stopHdfs()
}
在编写测试时,我继承了这个特性,然后编写测试用例,如 -
class SampleSpec extends FunSuite with TestSparkSession {
withLocalSparkSession {
testSpark =>
import testSpark.implicits._
// Test 1 Here
// Test 2 Here
}
}
当我 运行 我的测试 class 一次一个时,一切正常。但是当 运行 它们同时抛出 java.net.BindException: Address already in use
时。
这应该意味着在执行下一组测试时,已经创建的 hdfsCluster 尚未关闭。这就是为什么它无法创建另一个绑定到同一端口的原因。但随后在 afterAll() 中我停止了 hfdsCluster。
我的问题是我可以共享 hdfs 集群和 spark 会话的单个实例而不是每次都初始化它吗?我试图在方法之外提取初始化,但它仍然抛出相同的异常。即使我无法共享它,我如何才能正确停止我的集群并在下一次测试 class 执行时重新初始化它?
此外,请告诉我我编写 'unit' 使用 SparkSession 和 HDFS 存储的测试用例的方法是否正确。
任何帮助将不胜感激。
我通过在伴随对象中创建 hdfs 集群来解决它,以便它为所有测试服创建它的单个实例。