如何从文本文件(字符串类型数据)中读取映射并将数据动态加载到 Spark scala 中的镶木地板格式(具有不同数据类型的多列)

How to read from textfile(String type data) map and load data into parquet format(multiple columns with different datatype) in Spark scala dynamically

我们正在使用 sqoop 作为文本文件格式将数据从源 RDBMS 系统导入到 hadoop 环境。并且此文本文件需要加载到 parquet 格式的配置单元 table 中。 我们如何在不使用 Hive 支持的情况下处理这种情况(之前我们使用直线插入,我们设计不再使用 hive)并使用 parquet 直接写入 HDFS。

例如:- 在 sqoop 导入之后,假设我们在 HDFS 目标目录下有文件。 /data/loc/mydb/Mytable

Mytable 中的数据并且都是字符串类型。

-----------------------------------------
10|customer1|10.0|2016-09-07  08:38:00.0
20|customer2|20.0|2016-09-08  10:45:00.0
30|customer3|30.0|2016-09-10  03:26:00.0
------------------------------------------

目标 Hive table 架构。

rec_id: int
rec_name: String
rec_value: Decimal(2,1)
rec_created: Timestamp

我们如何使用 spark 并动态管理所有列的类型转换,将数据从 Mytable 加载到目标底层 Hive table 位置(parquet 格式)。

请注意:我们不能在这里使用 HiveContext。 非常感谢该方法中的任何帮助。提前致谢。

下面的示例读取了一个 .csv 文件,其格式与问题中显示的格式相同。

有些细节我想先说明一下。

在 table 模式中,字段:rec_value: Decimal(2,1) 必须是 rec_value: Decimal(3,1),原因如下:

DECIMAL类型表示固定precisionscale的数字。 创建 DECIMAL 列时,指定 precision、p 和 scale、s。 Precision是总位数,与小数点位置无关。 Scale是小数点后的位数。 要在不损失精度的情况下表示数字 10.0,您需要一个 DECIMAL 类型 precision 至少为 3,并且 scale 至少为 1。

所以 Hive table 将是:

CREATE TABLE tab_data (
  rec_id INT,
  rec_name STRING,
  rec_value DECIMAL(3,1),
  rec_created TIMESTAMP
) STORED AS PARQUET;

完整的 Scala 代码

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField, StructType, TimestampType}

object CsvToParquet {

  val spark = SparkSession
    .builder()
    .appName("CsvToParquet")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
    .config("spark.sql.parquet.writeLegacyFormat", true) // To avoid issues with data type between Spark and Hive
                                                         // The convention used by Spark to write Parquet data is configurable.
                                                         // This is determined by the property spark.sql.parquet.writeLegacyFormat
                                                         // The default value is false. If set to "true",
                                                         // Spark will use the same convention as Hive for writing the Parquet data.
    .getOrCreate()

  val sc = spark.sparkContext

  val inputPath = "hdfs://host:port/user/...../..../tab_data.csv"
  val outputPath = "hdfs://host:port/user/hive/warehouse/test.db/tab_data"

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      val DecimalType = DataTypes.createDecimalType(3, 1)

      /**
        * the data schema
        */
      val schema = StructType(List(StructField("rec_id", IntegerType, true), StructField("rec_name",StringType, true),
                   StructField("rec_value",DecimalType),StructField("rec_created",TimestampType, true)))

      /**
        * Reading the data from HDFS as .csv text file
        */
      val data = spark
        .read
        .option("sep","|")
        .option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
        .option("inferSchema",false)
        .schema(schema)
        .csv(inputPath)

       data.show(truncate = false)
       data.schema.printTreeString()

      /**
        * Writing the data as Parquet file
        */
      data
        .write
        .mode(SaveMode.Append)
        .option("compression", "none") // Assuming no data compression
        .parquet(outputPath)

    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

输入文件为 .csv 制表符分隔字段

10|customer1|10.0|2016-09-07  08:38:00.0
20|customer2|24.0|2016-09-08  10:45:00.0
30|customer3|35.0|2016-09-10  03:26:00.0
40|customer1|46.0|2016-09-11  08:38:00.0
........

正在阅读 Spark

+------+---------+---------+-------------------+
|rec_id|rec_name |rec_value|rec_created        |
+------+---------+---------+-------------------+
|10    |customer1|10.0     |2016-09-07 08:38:00|
|20    |customer2|24.0     |2016-09-08 10:45:00|
|30    |customer3|35.0     |2016-09-10 03:26:00|
|40    |customer1|46.0     |2016-09-11 08:38:00|
......

架构

root
 |-- rec_id: integer (nullable = true)
 |-- rec_name: string (nullable = true)
 |-- rec_value: decimal(3,1) (nullable = true)
 |-- rec_created: timestamp (nullable = true)

正在阅读 Hive

SELECT *
FROM tab_data;

+------------------+--------------------+---------------------+------------------------+--+
| tab_data.rec_id  | tab_data.rec_name  | tab_data.rec_value  |  tab_data.rec_created  |
+------------------+--------------------+---------------------+------------------------+--+
| 10               | customer1          | 10                  | 2016-09-07 08:38:00.0  |
| 20               | customer2          | 24                  | 2016-09-08 10:45:00.0  |
| 30               | customer3          | 35                  | 2016-09-10 03:26:00.0  |
| 40               | customer1          | 46                  | 2016-09-11 08:38:00.0  |
.....

希望这对您有所帮助。