如何在单个查询中为不同类型的列计算流式数据帧的统计信息?
How to compute statistics on a streaming dataframe for different type of columns in a single query?
我有一个包含三列时间、col1、col2 的流式数据帧。
+-----------------------+-------------------+--------------------+
|time |col1 |col2 |
+-----------------------+-------------------+--------------------+
|2018-01-10 15:27:21.289|0.4988615628926717 |0.1926744113882285 |
|2018-01-10 15:27:22.289|0.5430687338123434 |0.17084552928040175 |
|2018-01-10 15:27:23.289|0.20527770821641478|0.2221980020202523 |
|2018-01-10 15:27:24.289|0.130852802747647 |0.5213147910202641 |
+-----------------------+-------------------+--------------------+
col1 和col2 的数据类型是可变的。它可以是字符串或数字数据类型。
所以我必须计算每一列的统计数据。
对于字符串列,只计算有效计数和无效计数。
对于时间戳列,仅计算最小值和最大值。
对于数字类型的列,计算最小值、最大值、平均值和平均值。
我必须在单个查询中计算所有统计信息。
现在,我已经为每种类型的列分别计算了三个查询。
列举你想要的案例select。例如,如果流定义为:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
val schema = StructType(Seq(
StructField("v", TimestampType),
StructField("x", IntegerType),
StructField("y", StringType),
StructField("z", DecimalType(10, 2))
))
val df = spark.readStream.schema(schema).format("csv").load("/tmp/foo")
结果会是
val stats = df.select(df.dtypes.flatMap {
case (c, "StringType") =>
Seq(count(c) as s"valid_${c}", count("*") - count(c) as s"invalid_${c}")
case (c, t) if Seq("TimestampType", "DateType") contains t =>
Seq(min(c), max(c))
case (c, t) if (Seq("FloatType", "DoubleType", "IntegerType") contains t) || t.startsWith("DecimalType") =>
Seq(min(c), max(c), avg(c), stddev(c))
case _ => Seq.empty[Column]
}: _*)
// root
// |-- min(v): timestamp (nullable = true)
// |-- max(v): timestamp (nullable = true)
// |-- min(x): integer (nullable = true)
// |-- max(x): integer (nullable = true)
// |-- avg(x): double (nullable = true)
// |-- stddev_samp(x): double (nullable = true)
// |-- valid_y: long (nullable = false)
// |-- invalid_y: long (nullable = false)
// |-- min(z): decimal(10,2) (nullable = true)
// |-- max(z): decimal(10,2) (nullable = true)
// |-- avg(z): decimal(14,6) (nullable = true)
// |-- stddev_samp(z): double (nullable = true)
我有一个包含三列时间、col1、col2 的流式数据帧。
+-----------------------+-------------------+--------------------+
|time |col1 |col2 |
+-----------------------+-------------------+--------------------+
|2018-01-10 15:27:21.289|0.4988615628926717 |0.1926744113882285 |
|2018-01-10 15:27:22.289|0.5430687338123434 |0.17084552928040175 |
|2018-01-10 15:27:23.289|0.20527770821641478|0.2221980020202523 |
|2018-01-10 15:27:24.289|0.130852802747647 |0.5213147910202641 |
+-----------------------+-------------------+--------------------+
col1 和col2 的数据类型是可变的。它可以是字符串或数字数据类型。 所以我必须计算每一列的统计数据。 对于字符串列,只计算有效计数和无效计数。 对于时间戳列,仅计算最小值和最大值。 对于数字类型的列,计算最小值、最大值、平均值和平均值。 我必须在单个查询中计算所有统计信息。 现在,我已经为每种类型的列分别计算了三个查询。
列举你想要的案例select。例如,如果流定义为:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
val schema = StructType(Seq(
StructField("v", TimestampType),
StructField("x", IntegerType),
StructField("y", StringType),
StructField("z", DecimalType(10, 2))
))
val df = spark.readStream.schema(schema).format("csv").load("/tmp/foo")
结果会是
val stats = df.select(df.dtypes.flatMap {
case (c, "StringType") =>
Seq(count(c) as s"valid_${c}", count("*") - count(c) as s"invalid_${c}")
case (c, t) if Seq("TimestampType", "DateType") contains t =>
Seq(min(c), max(c))
case (c, t) if (Seq("FloatType", "DoubleType", "IntegerType") contains t) || t.startsWith("DecimalType") =>
Seq(min(c), max(c), avg(c), stddev(c))
case _ => Seq.empty[Column]
}: _*)
// root
// |-- min(v): timestamp (nullable = true)
// |-- max(v): timestamp (nullable = true)
// |-- min(x): integer (nullable = true)
// |-- max(x): integer (nullable = true)
// |-- avg(x): double (nullable = true)
// |-- stddev_samp(x): double (nullable = true)
// |-- valid_y: long (nullable = false)
// |-- invalid_y: long (nullable = false)
// |-- min(z): decimal(10,2) (nullable = true)
// |-- max(z): decimal(10,2) (nullable = true)
// |-- avg(z): decimal(14,6) (nullable = true)
// |-- stddev_samp(z): double (nullable = true)