Apache Spark - 将 csv 数据加载到数据集的通用方法
Apache Spark - Generic method for loading csv data to dataset
我想编写具有三个输入参数的泛型方法:
- 文件路径 - 字符串
- 架构 - ?
- 案例class
所以,我的想法是写一个这样的方法:
def load_sms_ds(filePath: String, schemaInfo: ?, cc: ?) = {
val ds = spark.read
.format("csv")
.option("header", "true")
.schema(?)
.option("delimiter",",")
.option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
.load(schemaInfo)
.as[?]
ds
}
和 return 数据集取决于输入参数。我不确定参数 schemaInfo 和 cc 应该是什么类型?
首先我推荐阅读 spark sql programming guide。这包含一些我认为在您学习 spark 时通常会对您有所帮助的内容。
让 运行 通过使用 case class 定义模式来读取 csv 文件的过程。
首先添加此示例所需的各种导入:
import java.io.{File, PrintWriter} // for reading / writing the example data
import org.apache.spark.sql.types.{StringType, StructField} // to define the schema
import org.apache.spark.sql.catalyst.ScalaReflection // used to generate the schema from a case class
import scala.reflect.runtime.universe.TypeTag // used to provide type information of the case class at runtime
import org.apache.spark.sql.Dataset, SparkSession}
import org.apache.spark.sql.Encoder // Used by spark to generate the schema
定义一个案例class,可以找到可用的不同类型here:
case class Example(
stringField : String,
intField : Int,
doubleField : Double
)
在给定案例 class 类型作为参数的情况下添加用于提取架构 (StructType) 的方法:
// T : TypeTag means that an implicit value of type TypeTag[T] must be available at the method call site. Scala will automatically generate this for you. See [here][3] for further details.
def schemaOf[T: TypeTag]: StructType = {
ScalaReflection
.schemaFor[T] // this method requires a TypeTag for T
.dataType
.asInstanceOf[StructType] // cast it to a StructType, what spark requires as its Schema
}
定义一种方法,从具有使用大小写定义的架构的路径读取 csv 文件 class:
// The implicit Encoder is needed by the `.at` method in order to create the Dataset[T]. The TypeTag is required by the schemaOf[T] call.
def readCSV[T : Encoder : TypeTag](
filePath: String
)(implicit spark : SparkSession) : Dataset[T]= {
spark.read
.option("header", "true")
.option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
.schema(schemaOf[T])
.csv(filePath) // spark provides this more explicit call to read from a csv file by default it uses comma and the separator but this can be changes.
.as[T]
}
创建一个 sparkSession:
implicit val spark = SparkSession.builder().master("local").getOrCreate()
将一些示例数据写入临时文件:
val data =
s"""|stringField,intField,doubleField
|hello,1,1.0
|world,2,2.0
|""".stripMargin
val file = File.createTempFile("test",".csv")
val pw = new PrintWriter(file)
pw.write(data)
pw.close()
调用此方法的示例:
import spark.implicits._ // so that an implicit Encoder gets pulled in for the case class
val df = readCSV[Example](file.getPath)
df.show()
我想编写具有三个输入参数的泛型方法:
- 文件路径 - 字符串
- 架构 - ?
- 案例class
所以,我的想法是写一个这样的方法:
def load_sms_ds(filePath: String, schemaInfo: ?, cc: ?) = {
val ds = spark.read
.format("csv")
.option("header", "true")
.schema(?)
.option("delimiter",",")
.option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
.load(schemaInfo)
.as[?]
ds
}
和 return 数据集取决于输入参数。我不确定参数 schemaInfo 和 cc 应该是什么类型?
首先我推荐阅读 spark sql programming guide。这包含一些我认为在您学习 spark 时通常会对您有所帮助的内容。
让 运行 通过使用 case class 定义模式来读取 csv 文件的过程。
首先添加此示例所需的各种导入:
import java.io.{File, PrintWriter} // for reading / writing the example data
import org.apache.spark.sql.types.{StringType, StructField} // to define the schema
import org.apache.spark.sql.catalyst.ScalaReflection // used to generate the schema from a case class
import scala.reflect.runtime.universe.TypeTag // used to provide type information of the case class at runtime
import org.apache.spark.sql.Dataset, SparkSession}
import org.apache.spark.sql.Encoder // Used by spark to generate the schema
定义一个案例class,可以找到可用的不同类型here:
case class Example(
stringField : String,
intField : Int,
doubleField : Double
)
在给定案例 class 类型作为参数的情况下添加用于提取架构 (StructType) 的方法:
// T : TypeTag means that an implicit value of type TypeTag[T] must be available at the method call site. Scala will automatically generate this for you. See [here][3] for further details.
def schemaOf[T: TypeTag]: StructType = {
ScalaReflection
.schemaFor[T] // this method requires a TypeTag for T
.dataType
.asInstanceOf[StructType] // cast it to a StructType, what spark requires as its Schema
}
定义一种方法,从具有使用大小写定义的架构的路径读取 csv 文件 class:
// The implicit Encoder is needed by the `.at` method in order to create the Dataset[T]. The TypeTag is required by the schemaOf[T] call.
def readCSV[T : Encoder : TypeTag](
filePath: String
)(implicit spark : SparkSession) : Dataset[T]= {
spark.read
.option("header", "true")
.option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
.schema(schemaOf[T])
.csv(filePath) // spark provides this more explicit call to read from a csv file by default it uses comma and the separator but this can be changes.
.as[T]
}
创建一个 sparkSession:
implicit val spark = SparkSession.builder().master("local").getOrCreate()
将一些示例数据写入临时文件:
val data =
s"""|stringField,intField,doubleField
|hello,1,1.0
|world,2,2.0
|""".stripMargin
val file = File.createTempFile("test",".csv")
val pw = new PrintWriter(file)
pw.write(data)
pw.close()
调用此方法的示例:
import spark.implicits._ // so that an implicit Encoder gets pulled in for the case class
val df = readCSV[Example](file.getPath)
df.show()