如何对 Json 文件执行基本统计以探索我的数字和非数字变量?

How to perform basic statistics on a Json file to explore my numeric and non-numeric variable?

我导入了一个 Json 文件,该文件具有以下架构:

sqlContext.read.json("filename").printSchema 
   root 
 |-- COL: long (nullable = true) 
 |-- DATA: array (nullable = true) 
 |    |-- element: struct (containsNull = true) 
 |    |    |-- Crate: string (nullable = true) 
 |    |    |-- MLrate: string (nullable = true) 
 |    |    |-- Nrout: string (nullable = true) 
 |    |    |-- up: string (nullable = true) 
 |-- IFAM: string (nullable = true) 
 |-- KTM: long (nullable = true) 

我是 Spark 的新手,我想执行基本统计,例如

我的问题是:

How to change the type of my variables in my schema, from 'string' to 'numeric' ? (Crate, MLrate and Nrout should be numeric variables) ?

您可以手动创建模式并将其应用于现有的 RDD。我假设您的数据存储在 df 变量中,并且与您上一个问题的示例具有相同的结构:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val schema = StructType(
    StructField("COL", LongType, true) ::
    StructField("DATA", ArrayType(StructType(
        StructField("Crate", IntegerType, true) ::
        StructField("MLrate", IntegerType, true) ::
        StructField("Nrout", IntegerType, true) ::
        StructField("up", IntegerType, true) ::
        Nil
    ), true), true) :: 
    StructField("IFAM", StringType, true) :: 
    StructField("KTM", LongType, true) :: 
    Nil
)

def convert(row: Row) = {
    val col = row.get(0)
    val data: Seq[Row] = row.getSeq(1)
    val rowData = data.map(r => Row.fromSeq(r.toSeq.map{
        case v: String => v.toInt
        case _ => null
    })).toList
    val ifam = row.get(2)
    val ktm = row.get(3)
    Row(col, rowData, ifam, ktm)
}

val updatedDf = sqlContext.applySchema(df.rdd.map(convert), schema)
updatedDf.printSchema

我们得到了预期的输出:

root
 |-- COL: long (nullable = true)
 |-- DATA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Crate: integer (nullable = true)
 |    |    |-- MLrate: integer (nullable = true)
 |    |    |-- Nrout: integer (nullable = true)
 |    |    |-- up: integer (nullable = true)
 |-- IFAM: string (nullable = true)
 |-- KTM: long (nullable = true)

您可以根据需要调整数值类型(DecimalTypeDoubleType)。

getting the min, max, mean, median and std of numeric variables How to do those basic statistics easily ?

获取数字变量统计信息的最简单方法是使用describe方法:

updatedDf.describe().show

你会得到格式良好的输出:

+-------+----+-------------+
|summary| COL|          KTM|
+-------+----+-------------+
|  count|   2|            2|
|   mean|21.5| 1.4300064E12|
| stddev| 0.5|         null|
|    min|  21|1430006400000|
|    max|  22|1430006400000|
+-------+----+-------------+

如果您需要可以编程方式访问的输出,您可以 org.apache.spark.sql.functions 导入 org.apache.spark.sql.functions

import org.apache.spark.sql.functions._

df.agg(
    min("KTM").alias("KTM_min"),
    max("KTM").alias("KTM_max"),
    mean("KTM").alias("KTM_mean")).show
上面的

None 将与数组字段一起使用。要使用这些,您可能需要一个 udf 或首先展平您的结构。

val flattenedSchema = StructType(
    StructField("COL", LongType, true) ::
    StructField("Crate", IntegerType, true) ::
    StructField("MLrate", IntegerType, true) ::
    StructField("Nrout", IntegerType, true) ::
    StructField("up", IntegerType, true) ::
    StructField("IFAM", StringType, true) :: 
    StructField("KTM", LongType, true) :: 
    Nil
)

def flatten(row: Row) = {
    val col = row.get(0)
    val data: Seq[Row] = row.getSeq(1)
    val ifam = row.get(2)
    val ktm = row.get(3)

    data.map(dat => {
        val crate = dat.get(0)
        val mlrate = dat.get(1)
        val nrout = dat.get(2)
        val up = dat.get(3)
        Row(col, crate, mlrate, nrout, up, ifam, ktm)
    })
}

val updatedFlatDf = sqlContext.
    applySchema(updatedDf.rdd.flatMap(flatten), flattenedSchema)

updatedFlatDf.describe().show

现在您可以统计每个字段:

+-------+----+------------------+------------------+------------------+----+-------------+
|summary| COL|             Crate|            MLrate|             Nrout|  up|          KTM|
+-------+----+------------------+------------------+------------------+----+-------------+
|  count|  12|                12|                12|                12|   0|           12|
|   mean|21.5|2.1666666666666665|             31.75|2.3333333333333335|null| 1.4300064E12|
| stddev| 0.5|1.2133516482134201|2.5535922410074345| 3.223179934302286|null|         null|
|    min|  21|                 0|                30|                 0|null|1430006400000|
|    max|  22|                 5|                38|                 8|null|1430006400000|
+-------+----+------------------+------------------+------------------+----+-------------+

getting the min, max, mean, median and std of numeric variables

获取分位数(包括中位数)对于大型数据集来说通常非常昂贵。如果你真的必须计算中位数,你可以 for useful. It is written in Python but pretty easy to implement in Scala as well. A little bit less comprehensive answer in Scala has been provided by Eugene Zhulenev .

编辑

如果您想将 nrout 转换为日期,您可以将 convert 中的 rowData 替换为如下内容:

val rowData = data.map(dat => {
    val crate = dat.get(0).toInt
    val mlrate = dat.get(1).toInt
    val nrout = java.sql.Timestamp.valueOf(dat.get(2))
    val up = dat.get(3).toInt
    Row(crate, mlrate, nrout, up)
})

并调整模式:

StructField("Nrout", TimestampType, true)