Spark 解析和处理文件 parquet/json
Spark parse and processing file parquet/json
我有一些大数据,采用 parquet 格式。
我想在 Scala 中使用 Spark 来转换这个文件并保存为 json 格式。
我的输入文件看起来像这样:
a1, a2, b, d, c1, c2, c3, e (header)
1, null, 2, "1d", "abc", null, null, 7 (row1)
null, 3, 4, "2c", null, null, "def", 9 (row2)
... more rows to follow
a1
和a2
本质上是同一列,对于每一行,只有一个不为空,c1
、c2
和c3
,我想用两个转换处理我的输入文件,第一个是为每一行形成正确的 a
和 c
列并删除空值,所以输出看起来像这样:
a, b, d, c, e (header)
1, 2, "1d", "abc", 7 (row1)
3, 4, "2c", "def", 9 (row2)
... more rows to follow
我想做的第二个转换是:基于列d
,如果列d
的值落入一组值(“1d”,“12” “a9”,“df”),我想在这一行中再添加一个值(“1f8”),如果没有,添加另一个值(“d8n”),第二次后输出将如下所示转换:
a, b, d, c, e, f (header)
1, 2, "1d", "abc", 7, "1f8" (row1)
3, 4, "2c", "def", 9, "d8n" (row2)
... more rows to follow
注:f
列是在d
的基础上新增的。
示例数据
scala> df.show(false)
+----+----+---+---+----+----+----+---+
|a1 |a2 |b |d |c1 |c2 |c3 |e |
+----+----+---+---+----+----+----+---+
|1 |null|2 |4 |abc |null|null|7 |
|null|3 |4 |1 |null|null|def |9 |
+----+----+---+---+----+----+----+---+
第一次转换,将 nvl
函数应用于列 a1
、a2
、c1
、c2
、c3
的动态长度列名。
scala> val columns = (
df
.columns
.filter(_.length > 1)
.groupBy(c => c(0))
.map(c =>
if(c._2.length > 2)
s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString("nvl(",",",")")} as ${c._1}"
else
s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString} as ${c._1}"
) ++
df.columns.filter(_.length == 1)
).toSeq
以上代码将组合如下列
columns: Seq[String] = List(nvl(a1,a2) as a, nvl(nvl(c1,c2),nvl(c2,c3)) as c, b, d, e)
scala>
df
.selectExpr(columns:_*)
.withColumn(
"f",
when(
$"d".cast("int").isin(Seq(4,2,8,12):_*),
lit("1f8")
).otherwise("d8n")
) // Second Transformation
.show(false)
最终输出
+---+---+---+---+---+---+
|a |c |b |d |e |f |
+---+---+---+---+---+---+
|1 |abc|2 |4 |7 |1f8|
|3 |def|4 |1 |9 |d8n|
+---+---+---+---+---+---+
我有一些大数据,采用 parquet 格式。 我想在 Scala 中使用 Spark 来转换这个文件并保存为 json 格式。 我的输入文件看起来像这样:
a1, a2, b, d, c1, c2, c3, e (header)
1, null, 2, "1d", "abc", null, null, 7 (row1)
null, 3, 4, "2c", null, null, "def", 9 (row2)
... more rows to follow
a1
和a2
本质上是同一列,对于每一行,只有一个不为空,c1
、c2
和c3
,我想用两个转换处理我的输入文件,第一个是为每一行形成正确的 a
和 c
列并删除空值,所以输出看起来像这样:
a, b, d, c, e (header)
1, 2, "1d", "abc", 7 (row1)
3, 4, "2c", "def", 9 (row2)
... more rows to follow
我想做的第二个转换是:基于列d
,如果列d
的值落入一组值(“1d”,“12” “a9”,“df”),我想在这一行中再添加一个值(“1f8”),如果没有,添加另一个值(“d8n”),第二次后输出将如下所示转换:
a, b, d, c, e, f (header)
1, 2, "1d", "abc", 7, "1f8" (row1)
3, 4, "2c", "def", 9, "d8n" (row2)
... more rows to follow
注:f
列是在d
的基础上新增的。
示例数据
scala> df.show(false)
+----+----+---+---+----+----+----+---+
|a1 |a2 |b |d |c1 |c2 |c3 |e |
+----+----+---+---+----+----+----+---+
|1 |null|2 |4 |abc |null|null|7 |
|null|3 |4 |1 |null|null|def |9 |
+----+----+---+---+----+----+----+---+
第一次转换,将 nvl
函数应用于列 a1
、a2
、c1
、c2
、c3
的动态长度列名。
scala> val columns = (
df
.columns
.filter(_.length > 1)
.groupBy(c => c(0))
.map(c =>
if(c._2.length > 2)
s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString("nvl(",",",")")} as ${c._1}"
else
s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString} as ${c._1}"
) ++
df.columns.filter(_.length == 1)
).toSeq
以上代码将组合如下列
columns: Seq[String] = List(nvl(a1,a2) as a, nvl(nvl(c1,c2),nvl(c2,c3)) as c, b, d, e)
scala>
df
.selectExpr(columns:_*)
.withColumn(
"f",
when(
$"d".cast("int").isin(Seq(4,2,8,12):_*),
lit("1f8")
).otherwise("d8n")
) // Second Transformation
.show(false)
最终输出
+---+---+---+---+---+---+
|a |c |b |d |e |f |
+---+---+---+---+---+---+
|1 |abc|2 |4 |7 |1f8|
|3 |def|4 |1 |9 |d8n|
+---+---+---+---+---+---+