单元测试火花数据帧转换链接
Unit Testing spark dataframes transformation chaining
我是 scala spark 生态系统的新手,想知道对链式数据帧进行单元测试的最佳方法是什么transformation.So这是我想测试的方法的代码示例
def writeToParquet(spark: SparkSession, dataFrame: DataFrame, col1: DataType1, col2:DataType2): Unit {
dataFrame
.withColumn("date", some_columnar_date_logic)
.withColumn("hour", some_more_functional_logic)
.... //couple more transformation logic
.write
.mode(SaveMode.Append)
.partitionBy("col1", "col2", "col3")
.parquet("some hdfs/s3/url")
}
问题是 parquet 是 Unit
return 类型,这使得测试变得困难。
转换本质上是不可变的,这使得模拟和间谍有点困难
,这一事实进一步放大了这个问题
为了创建数据框,我在 csv 中转储了测试数据集
请找到数据框架单元测试的简单示例。你可以把它分成两部分。第一的。测试转换,你可以做简单的shell脚本来测试写入的文件
import com.holdenkarau.spark.testing._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.scalatest.{FunSuite, Matchers}
class SomeDFTest extends FunSuite with Matchers with DataFrameSuiteBase {
import spark.implicits._
test("Testing Input customer data date transformation") {
val inputSchema = List(
StructField("number", IntegerType, false),
StructField("word", StringType, false)
)
val expectedSchema = List(
StructField("number", IntegerType, false),
StructField("word", StringType, false),
StructField("dummyColumn", StringType, false)
)
val inputData = Seq(
Row(8, "bat"),
Row(64, "mouse"),
Row(-27, "horse")
)
val expectedData = Seq(
Row (8, "bat","test"),
Row(64, "mouse","test"),
Row(-27, "horse","test")
)
val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(inputData),
StructType(inputSchema)
)
val expectedDF = spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
)
val actual = transformSomeDf(inputDF)
assertDataFrameEquals(actual, expectedDF) // equal
}
def transformSomeDf(df:DataFrame):DataFrame={
df.withColumn("dummyColumn",lit("test"))
}
}
Sbt.build 配置
name := "SparkTest"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.holdenkarau" %% "spark-testing-base" % "2.4.0_0.11.0" % Test
)
我在测试数据框时想到的第一件事就是将转换和 IOs
分开
所以对于以上场景
我们可以把上面的链分成三部分
class Coordinator {
def transformAndWrite(dataframe: Dataframe): Unit = {
transformedDf = dataFrame
.withColumn("date", some_columnar_date_logic)
.withColumn("hour", some_more_functional_logic)
.... //couple more transformation logic
partitionedDfWriter = transformedDf.write
.mode(SaveMode.Append)
.partitionBy("col1", "col2", "col3")
和
partitionedDfWriter.parquet("some hdfs/s3/url")
}
现在我们可以将它们移动到三个独立的 classes,
DFTransformer
, DFPartitioner
和
DataFrameParquetWriter extends ResourceWriter
所以代码会变成这样
class DFTransformer {
def transform(dataframe:DataFrame): Dataframe = {
return dataFrame
.withColumn("date", some_columnar_date_logic)
.withColumn("hour", some_more_functional_logic)
.... //couple more transformation logic
}
class DfPartitioner {
def partition(dataframe: DataFrame): DataFrameWriter = {
return dataframe.write
.mode(SaveMode.Append)
.partitionBy("col1", "col2", "col3")
}
}
和
class DataFrameParquetWriter extends ResourceWriter {
overide def write(partitionedDfWriter: DataFrameWriter) = {
partitionedDfWriter.parquet("some hdfs/s3/url")
}
class Coordinator(dfTransformer:DfTransformer, dfPartitioner: DFPartitioner, resourceWriter: ResourceWriter) {
val transformedDf = dfTransformer.transform(dataframe)
val partitionedDfWriter = dfPartitioner.partition(transformedDf)
resourceWriter.write(partitionedDfWriter)
}
上面的优点是,当你必须测试你的 Coordinator class 时,你可以很容易地使用 Mockito
来模拟你的依赖关系。
现在测试DFTransformer
也很容易,
您可以传递存根数据帧并断言返回的数据帧。(使用 spark-testing-base)。我们还可以测试转换返回的列。我们还可以测试count
我是 scala spark 生态系统的新手,想知道对链式数据帧进行单元测试的最佳方法是什么transformation.So这是我想测试的方法的代码示例
def writeToParquet(spark: SparkSession, dataFrame: DataFrame, col1: DataType1, col2:DataType2): Unit {
dataFrame
.withColumn("date", some_columnar_date_logic)
.withColumn("hour", some_more_functional_logic)
.... //couple more transformation logic
.write
.mode(SaveMode.Append)
.partitionBy("col1", "col2", "col3")
.parquet("some hdfs/s3/url")
}
问题是 parquet 是 Unit
return 类型,这使得测试变得困难。
转换本质上是不可变的,这使得模拟和间谍有点困难
为了创建数据框,我在 csv 中转储了测试数据集
请找到数据框架单元测试的简单示例。你可以把它分成两部分。第一的。测试转换,你可以做简单的shell脚本来测试写入的文件
import com.holdenkarau.spark.testing._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.scalatest.{FunSuite, Matchers}
class SomeDFTest extends FunSuite with Matchers with DataFrameSuiteBase {
import spark.implicits._
test("Testing Input customer data date transformation") {
val inputSchema = List(
StructField("number", IntegerType, false),
StructField("word", StringType, false)
)
val expectedSchema = List(
StructField("number", IntegerType, false),
StructField("word", StringType, false),
StructField("dummyColumn", StringType, false)
)
val inputData = Seq(
Row(8, "bat"),
Row(64, "mouse"),
Row(-27, "horse")
)
val expectedData = Seq(
Row (8, "bat","test"),
Row(64, "mouse","test"),
Row(-27, "horse","test")
)
val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(inputData),
StructType(inputSchema)
)
val expectedDF = spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
)
val actual = transformSomeDf(inputDF)
assertDataFrameEquals(actual, expectedDF) // equal
}
def transformSomeDf(df:DataFrame):DataFrame={
df.withColumn("dummyColumn",lit("test"))
}
}
Sbt.build 配置
name := "SparkTest"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.holdenkarau" %% "spark-testing-base" % "2.4.0_0.11.0" % Test
)
我在测试数据框时想到的第一件事就是将转换和 IOs
分开所以对于以上场景 我们可以把上面的链分成三部分
class Coordinator {
def transformAndWrite(dataframe: Dataframe): Unit = {
transformedDf = dataFrame
.withColumn("date", some_columnar_date_logic)
.withColumn("hour", some_more_functional_logic)
.... //couple more transformation logic
partitionedDfWriter = transformedDf.write
.mode(SaveMode.Append)
.partitionBy("col1", "col2", "col3")
和
partitionedDfWriter.parquet("some hdfs/s3/url")
}
现在我们可以将它们移动到三个独立的 classes,
DFTransformer
, DFPartitioner
和
DataFrameParquetWriter extends ResourceWriter
所以代码会变成这样
class DFTransformer {
def transform(dataframe:DataFrame): Dataframe = {
return dataFrame
.withColumn("date", some_columnar_date_logic)
.withColumn("hour", some_more_functional_logic)
.... //couple more transformation logic
}
class DfPartitioner {
def partition(dataframe: DataFrame): DataFrameWriter = {
return dataframe.write
.mode(SaveMode.Append)
.partitionBy("col1", "col2", "col3")
}
}
和
class DataFrameParquetWriter extends ResourceWriter {
overide def write(partitionedDfWriter: DataFrameWriter) = {
partitionedDfWriter.parquet("some hdfs/s3/url")
}
class Coordinator(dfTransformer:DfTransformer, dfPartitioner: DFPartitioner, resourceWriter: ResourceWriter) {
val transformedDf = dfTransformer.transform(dataframe)
val partitionedDfWriter = dfPartitioner.partition(transformedDf)
resourceWriter.write(partitionedDfWriter)
}
上面的优点是,当你必须测试你的 Coordinator class 时,你可以很容易地使用
Mockito
来模拟你的依赖关系。现在测试
DFTransformer
也很容易, 您可以传递存根数据帧并断言返回的数据帧。(使用 spark-testing-base)。我们还可以测试转换返回的列。我们还可以测试count