如何在 Spark Scala 中将 org.apache.spark.sql.ColumnName 转换为字符串,小数类型?

How to convert org.apache.spark.sql.ColumnName to string,Decimal type in Spark Scala?

我有一个 JSON 如下

{"name":"method1","parameter1":"P1name","parameter2": 1.0}

我正在加载我的 JSON 文件

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt") 
scala> df.show()
+-------+----------+----------+
|   name|parameter1|parameter2|
+-------+----------+----------+
|method1|    P1name|    1.0   |
+-------+----------+----------+

我有如下功能:

def method1(P1:String, P2:Double)={
     |  print(P1)
         print(P2)
     | }

我在执行下面的代码后根据列名调用我的 method1 它应该执行 method1。

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)

但是我收到以下错误。

<console>:63: error: type mismatch;
 found   : org.apache.spark.sql.ColumnName
 required: String

请告诉我如何将 org.apache.spark.sql.ColumnName 数据类型转换为 String

你可以这样试试:

scala> def method1(P1:String, P2:Double): Int = {
     |   println(P1)
     |   println(P2)
     |   0
     | }

scala> def method2(P1:String, P2:Double): Int = {
     |   println(P1)
     |   println(P2)
     |   1
     | }

df.withColumn("methodCalling", when($"name" === "method1", method1(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head))
  .otherwise(when($"name" === "method2", method2(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head)))).show

//output

P1name
1.0
+-------+----------+----------+-------------+
|   name|parameter1|parameter2|methodCalling|
+-------+----------+----------+-------------+
|method1|    P1name|       1.0|            0|
+-------+----------+----------+-------------+

你必须return你的方法中的一些东西,否则它会重新调整单元并且在打印结果后会出错:

java.lang.RuntimeException: Unsupported literal type class scala.runtime.BoxedUnit ()
  at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
  at org.apache.spark.sql.functions$.lit(functions.scala:101)
  at org.apache.spark.sql.functions$.when(functions.scala:1245)
  ... 50 elided

谢谢。

当您将参数作为

传递时
method1($"parameter1",$"parameter2")

您将列传递给函数,而不是原始数据类型。因此,如果您想在函数内部应用 原始数据类型操作 ,我建议您将 method1method2 更改为 udf 函数。 udf 函数必须 return 新列的每一行的值。

import org.apache.spark.sql.functions._
def method1 = udf((P1:String, P2:Double)=>{
  print(P1)
  print(P2)
  P1+P2
})

def method2 = udf((P1:String, P2:Double)=>{
  print(P1)
  print(P2)
  P1+P2
})

那么你的 withColumn api 应该可以正常工作

df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)

注意:udf 函数执行数据序列化和反序列化以更改要按行处理的列数据类型,这会增加复杂性和大量内存使用。 spark functions尽量用

我想你只是想阅读 JSON 并基于它调用方法。

由于您已经创建了一个数据框,您可以执行以下操作:

df.map( row => (row.getString(0), row.getString(1) , row.getDouble(2)   ) ).collect
  .foreach { x =>
      x._1.trim.toLowerCase match {
          case "method1" => method1(x._2, x._3) 
        //case "method2" => method2(x._2, x._3)
        //case _ => methodn(x._2, x._3)
      }
   }
// Output : P1name1.0
// Because you used `print` and not `println` ;)