Spark 解析和处理文件 parquet/json

Spark parse and processing file parquet/json

我有一些大数据,采用 parquet 格式。 我想在 Scala 中使用 Spark 来转换这个文件并保存为 json 格式。 我的输入文件看起来像这样:

a1,   a2,  b,  d,    c1,   c2,    c3,   e (header)
1,   null, 2, "1d", "abc", null, null,  7 (row1)
null, 3,   4, "2c", null,  null, "def", 9 (row2)
... more rows to follow

a1a2本质上是同一列,对于每一行,只有一个不为空,c1c2c3,我想用两个转换处理我的输入文件,第一个是为每一行形成正确的 ac 列并删除空值,所以输出看起来像这样:

a,  b,   d,   c,    e (header)
1,  2, "1d", "abc", 7 (row1)
3,  4, "2c", "def", 9 (row2)
... more rows to follow

我想做的第二个转换是:基于列d,如果列d的值落入一组值(“1d”,“12” “a9”,“df”),我想在这一行中再添加一个值(“1f8”),如果没有,添加另一个值(“d8n”),第二次后输出将如下所示转换:

a,  b,   d,   c,    e,   f   (header)
1,  2, "1d", "abc", 7, "1f8" (row1)
3,  4, "2c", "def", 9, "d8n" (row2)
... more rows to follow

注:f列是在d的基础上新增的。

示例数据

scala> df.show(false)
+----+----+---+---+----+----+----+---+
|a1  |a2  |b  |d  |c1  |c2  |c3  |e  |
+----+----+---+---+----+----+----+---+
|1   |null|2  |4  |abc |null|null|7  |
|null|3   |4  |1  |null|null|def |9  |
+----+----+---+---+----+----+----+---+

第一次转换,将 nvl 函数应用于列 a1a2c1c2c3 的动态长度列名。

scala> val columns = (
    df
    .columns
    .filter(_.length > 1)
    .groupBy(c => c(0))
    .map(c => 
        if(c._2.length > 2) 
            s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString("nvl(",",",")")} as ${c._1}" 
        else 
            s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString} as ${c._1}"
        ) ++ 
    df.columns.filter(_.length == 1)
).toSeq

以上代码将组合如下列

columns: Seq[String] = List(nvl(a1,a2) as a, nvl(nvl(c1,c2),nvl(c2,c3)) as c, b, d, e)
scala> 
    df
    .selectExpr(columns:_*)
    .withColumn(
        "f",
        when(
            $"d".cast("int").isin(Seq(4,2,8,12):_*),
            lit("1f8")
        ).otherwise("d8n")
    ) // Second Transformation
    .show(false)

最终输出

+---+---+---+---+---+---+
|a  |c  |b  |d  |e  |f  |
+---+---+---+---+---+---+
|1  |abc|2  |4  |7  |1f8|
|3  |def|4  |1  |9  |d8n|
+---+---+---+---+---+---+