如何在 Spark Scala 中将 org.apache.spark.sql.ColumnName 转换为字符串,小数类型?
How to convert org.apache.spark.sql.ColumnName to string,Decimal type in Spark Scala?
我有一个 JSON 如下
{"name":"method1","parameter1":"P1name","parameter2": 1.0}
我正在加载我的 JSON 文件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt")
scala> df.show()
+-------+----------+----------+
| name|parameter1|parameter2|
+-------+----------+----------+
|method1| P1name| 1.0 |
+-------+----------+----------+
我有如下功能:
def method1(P1:String, P2:Double)={
| print(P1)
print(P2)
| }
我在执行下面的代码后根据列名调用我的 method1 它应该执行 method1。
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
但是我收到以下错误。
<console>:63: error: type mismatch;
found : org.apache.spark.sql.ColumnName
required: String
请告诉我如何将 org.apache.spark.sql.ColumnName 数据类型转换为 String
你可以这样试试:
scala> def method1(P1:String, P2:Double): Int = {
| println(P1)
| println(P2)
| 0
| }
scala> def method2(P1:String, P2:Double): Int = {
| println(P1)
| println(P2)
| 1
| }
df.withColumn("methodCalling", when($"name" === "method1", method1(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head))
.otherwise(when($"name" === "method2", method2(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head)))).show
//output
P1name
1.0
+-------+----------+----------+-------------+
| name|parameter1|parameter2|methodCalling|
+-------+----------+----------+-------------+
|method1| P1name| 1.0| 0|
+-------+----------+----------+-------------+
你必须return你的方法中的一些东西,否则它会重新调整单元并且在打印结果后会出错:
java.lang.RuntimeException: Unsupported literal type class scala.runtime.BoxedUnit ()
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.functions$.when(functions.scala:1245)
... 50 elided
谢谢。
当您将参数作为
传递时
method1($"parameter1",$"parameter2")
您将列传递给函数,而不是原始数据类型。因此,如果您想在函数内部应用 原始数据类型操作 ,我建议您将 method1
和 method2
更改为 udf
函数。 udf
函数必须 return 新列的每一行的值。
import org.apache.spark.sql.functions._
def method1 = udf((P1:String, P2:Double)=>{
print(P1)
print(P2)
P1+P2
})
def method2 = udf((P1:String, P2:Double)=>{
print(P1)
print(P2)
P1+P2
})
那么你的 withColumn
api 应该可以正常工作
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
注意:udf 函数执行数据序列化和反序列化以更改要按行处理的列数据类型,这会增加复杂性和大量内存使用。 spark functions尽量用
我想你只是想阅读 JSON 并基于它调用方法。
由于您已经创建了一个数据框,您可以执行以下操作:
df.map( row => (row.getString(0), row.getString(1) , row.getDouble(2) ) ).collect
.foreach { x =>
x._1.trim.toLowerCase match {
case "method1" => method1(x._2, x._3)
//case "method2" => method2(x._2, x._3)
//case _ => methodn(x._2, x._3)
}
}
// Output : P1name1.0
// Because you used `print` and not `println` ;)
我有一个 JSON 如下
{"name":"method1","parameter1":"P1name","parameter2": 1.0}
我正在加载我的 JSON 文件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt")
scala> df.show()
+-------+----------+----------+
| name|parameter1|parameter2|
+-------+----------+----------+
|method1| P1name| 1.0 |
+-------+----------+----------+
我有如下功能:
def method1(P1:String, P2:Double)={
| print(P1)
print(P2)
| }
我在执行下面的代码后根据列名调用我的 method1 它应该执行 method1。
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
但是我收到以下错误。
<console>:63: error: type mismatch;
found : org.apache.spark.sql.ColumnName
required: String
请告诉我如何将 org.apache.spark.sql.ColumnName 数据类型转换为 String
你可以这样试试:
scala> def method1(P1:String, P2:Double): Int = {
| println(P1)
| println(P2)
| 0
| }
scala> def method2(P1:String, P2:Double): Int = {
| println(P1)
| println(P2)
| 1
| }
df.withColumn("methodCalling", when($"name" === "method1", method1(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head))
.otherwise(when($"name" === "method2", method2(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head)))).show
//output
P1name
1.0
+-------+----------+----------+-------------+
| name|parameter1|parameter2|methodCalling|
+-------+----------+----------+-------------+
|method1| P1name| 1.0| 0|
+-------+----------+----------+-------------+
你必须return你的方法中的一些东西,否则它会重新调整单元并且在打印结果后会出错:
java.lang.RuntimeException: Unsupported literal type class scala.runtime.BoxedUnit ()
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.functions$.when(functions.scala:1245)
... 50 elided
谢谢。
当您将参数作为
传递时method1($"parameter1",$"parameter2")
您将列传递给函数,而不是原始数据类型。因此,如果您想在函数内部应用 原始数据类型操作 ,我建议您将 method1
和 method2
更改为 udf
函数。 udf
函数必须 return 新列的每一行的值。
import org.apache.spark.sql.functions._
def method1 = udf((P1:String, P2:Double)=>{
print(P1)
print(P2)
P1+P2
})
def method2 = udf((P1:String, P2:Double)=>{
print(P1)
print(P2)
P1+P2
})
那么你的 withColumn
api 应该可以正常工作
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
注意:udf 函数执行数据序列化和反序列化以更改要按行处理的列数据类型,这会增加复杂性和大量内存使用。 spark functions尽量用
我想你只是想阅读 JSON 并基于它调用方法。
由于您已经创建了一个数据框,您可以执行以下操作:
df.map( row => (row.getString(0), row.getString(1) , row.getDouble(2) ) ).collect
.foreach { x =>
x._1.trim.toLowerCase match {
case "method1" => method1(x._2, x._3)
//case "method2" => method2(x._2, x._3)
//case _ => methodn(x._2, x._3)
}
}
// Output : P1name1.0
// Because you used `print` and not `println` ;)