从 Spark Scala 中存在的 Dataframe 创建动态查询
Create dynamic query from the Dataframe present in Spark Scala
我有一个数据框 DF,如下所示。基于 Issue 列和 Datatype 列,我想创建一个动态查询。
如果问题列是 YES 然后检查数据类型,如果它的 StringType 添加 Trim(DiffColumnName) 到查询或者如果数据类型是整数,则执行一些其他操作,例如 round(COUNT,2)
对于问题类型为 NO 的列,什么也不做,select 列本身
查询应该是这样的
Select DEST_COUNTRY_NAME, trim(ORIGIN_COUNTRY_NAME),round(COUNT,2)
+-------------------+-----------+-----+
| DiffColumnName| Datatype|Issue|
+-------------------+-----------+-----+
| DEST_COUNTRY_NAME| StringType| NO|
|ORIGIN_COUNTRY_NAME| StringType| YES|
| COUNT|IntegerType| YES|
+-------------------+-----------+-----+
我不确定我是否应该在这里使用 If else 条件或 case 语句或创建 UDF。我的数据框(即列)也是动态的,每次都会更改。
需要一些如何在此处进行操作的建议。谢谢
这可以使用以下代码完成。
- 通过应用所需的操作导出新列
- 使用collect_list将值聚合到一个数组
- 使用 concat_ws 和 concat
格式化输出
val origDF=Seq(("DEST_COUNTRY_NAME","StringType","NO"),
("ORIGIN_COUNTRY_NAME","StringType","YES"),
("COUNT","IntegerType","YES"),
("TESTCOL","StringType","NO")
).toDF("DiffColumnName","Datatype","Issue")
val finalDF=origDF.withColumn("newCol",when(col("Issue")==="YES" && col("DataType")==="StringType",concat(lit("trim("),col("DiffColumnName"),lit(")")))
when(col("Issue")==="YES" && col("DataType")==="IntegerType",concat(lit("round("),col("DiffColumnName"),lit(",2)")))
when(col("Issue")==="NO",col("DiffColumnName"))
)
finalDF.agg(collect_list("newCol").alias("queryout")).select(concat(lit("select "),concat_ws(",",col("queryout")))).show(false)
我在数据中添加了一个额外的列用于测试,它给了我想要的输出。
+-------------------------------------------------------------------------+
|concat(select , concat_ws(,, queryout)) |
+-------------------------------------------------------------------------+
|select DEST_COUNTRY_NAME,trim(ORIGIN_COUNTRY_NAME),round(COUNT,2),TESTCOL|
+-------------------------------------------------------------------------+
我有一个数据框 DF,如下所示。基于 Issue 列和 Datatype 列,我想创建一个动态查询。 如果问题列是 YES 然后检查数据类型,如果它的 StringType 添加 Trim(DiffColumnName) 到查询或者如果数据类型是整数,则执行一些其他操作,例如 round(COUNT,2) 对于问题类型为 NO 的列,什么也不做,select 列本身
查询应该是这样的
Select DEST_COUNTRY_NAME, trim(ORIGIN_COUNTRY_NAME),round(COUNT,2)
+-------------------+-----------+-----+
| DiffColumnName| Datatype|Issue|
+-------------------+-----------+-----+
| DEST_COUNTRY_NAME| StringType| NO|
|ORIGIN_COUNTRY_NAME| StringType| YES|
| COUNT|IntegerType| YES|
+-------------------+-----------+-----+
我不确定我是否应该在这里使用 If else 条件或 case 语句或创建 UDF。我的数据框(即列)也是动态的,每次都会更改。
需要一些如何在此处进行操作的建议。谢谢
这可以使用以下代码完成。
- 通过应用所需的操作导出新列
- 使用collect_list将值聚合到一个数组
- 使用 concat_ws 和 concat 格式化输出
val origDF=Seq(("DEST_COUNTRY_NAME","StringType","NO"),
("ORIGIN_COUNTRY_NAME","StringType","YES"),
("COUNT","IntegerType","YES"),
("TESTCOL","StringType","NO")
).toDF("DiffColumnName","Datatype","Issue")
val finalDF=origDF.withColumn("newCol",when(col("Issue")==="YES" && col("DataType")==="StringType",concat(lit("trim("),col("DiffColumnName"),lit(")")))
when(col("Issue")==="YES" && col("DataType")==="IntegerType",concat(lit("round("),col("DiffColumnName"),lit(",2)")))
when(col("Issue")==="NO",col("DiffColumnName"))
)
finalDF.agg(collect_list("newCol").alias("queryout")).select(concat(lit("select "),concat_ws(",",col("queryout")))).show(false)
我在数据中添加了一个额外的列用于测试,它给了我想要的输出。
+-------------------------------------------------------------------------+
|concat(select , concat_ws(,, queryout)) |
+-------------------------------------------------------------------------+
|select DEST_COUNTRY_NAME,trim(ORIGIN_COUNTRY_NAME),round(COUNT,2),TESTCOL|
+-------------------------------------------------------------------------+