使用scala spark检索存储在每一行的数据框列中的数组
Retrieve array stored in the dataframe column for each row using scala apark
以下数据框属于我
+-------------------------------+-------------------------+
|value |feeling |
+-------------------------------+-------------------------+
|Sam got these marks |[sad, sad, dissappointed ]|
|I got good marks |[happy, excited, happy] |
+-------------------------------+-------------------------+
我想遍历此数据框并获取每行的标记列数组,并将标记数组用于某种计算方法。
def calculationMethod(arrayValue : Array[String]) {
//get averege of words
}
输出数据帧
+-------------------------------+-----------------------------+--------------
|value |feeling |average |
+-------------------------------+-----------------------------------------+
|Sam got these marks |[sad, sad, dissappointed ]|sad |
|I got good marks |[happy, excited, happy] |happy |
+-------------------------------+-----------------------------------------+
我不确定如何遍历每一行并在第二列中获取可以传递到我编写的方法中的数组。另请注意,问题中显示的数据帧是流数据帧。
编辑 1
val calculateUDF = udf(calculationMethod _)
val editedDataFrame = filteredDataFrame.withColumn("average", calculateUDF(col("feeling")))
def calculationMethod(emojiArray: Seq[String]) : DataFrame {
val existingSparkSession = SparkSession.builder().getOrCreate()
import existingSparkSession.implicits._
val df = emojiArray.toDF("feeling")
val result = df.selectExpr(
"feeling",
"'U+' || trim('0' , string(hex(encode(feeling, 'utf-32')))) as unicode"
)
result
}
我收到以下错误
Schema for type org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
is not supported
请注意,问题中提到的初始数据帧是流数据帧
编辑 2
这应该是我期待的最终数据帧
+-------------------+--------------+-------------------------+
|value |feeling |unicode |
+-------------------+--------------+-------------------------+
|Sam got these marks|[] |[U+1F600 U+1F606 U+1F601]|
|I got good marks |[] | [U+1F604 U+1F643 ] |
+-------------------+---------------+-------------------------+
您可以 transform
数组而不是使用 UDF:
val df2 = df.withColumn(
"unicode",
expr("transform(feeling, x -> 'U+' || ltrim('0' , string(hex(encode(x, 'utf-32')))))")
)
df2.show(false)
+-------------------+------------+---------------------------+
|value |feeling |unicode |
+-------------------+------------+---------------------------+
|Sam got these marks|[, , ]|[U+1F600, U+1F606, U+1F601]|
|I got good marks |[, ] |[U+1F604, U+1F643] |
+-------------------+------------+---------------------------+
以下数据框属于我
+-------------------------------+-------------------------+
|value |feeling |
+-------------------------------+-------------------------+
|Sam got these marks |[sad, sad, dissappointed ]|
|I got good marks |[happy, excited, happy] |
+-------------------------------+-------------------------+
我想遍历此数据框并获取每行的标记列数组,并将标记数组用于某种计算方法。
def calculationMethod(arrayValue : Array[String]) {
//get averege of words
}
输出数据帧
+-------------------------------+-----------------------------+--------------
|value |feeling |average |
+-------------------------------+-----------------------------------------+
|Sam got these marks |[sad, sad, dissappointed ]|sad |
|I got good marks |[happy, excited, happy] |happy |
+-------------------------------+-----------------------------------------+
我不确定如何遍历每一行并在第二列中获取可以传递到我编写的方法中的数组。另请注意,问题中显示的数据帧是流数据帧。
编辑 1
val calculateUDF = udf(calculationMethod _)
val editedDataFrame = filteredDataFrame.withColumn("average", calculateUDF(col("feeling")))
def calculationMethod(emojiArray: Seq[String]) : DataFrame {
val existingSparkSession = SparkSession.builder().getOrCreate()
import existingSparkSession.implicits._
val df = emojiArray.toDF("feeling")
val result = df.selectExpr(
"feeling",
"'U+' || trim('0' , string(hex(encode(feeling, 'utf-32')))) as unicode"
)
result
}
我收到以下错误
Schema for type org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] is not supported
请注意,问题中提到的初始数据帧是流数据帧
编辑 2
这应该是我期待的最终数据帧
+-------------------+--------------+-------------------------+
|value |feeling |unicode |
+-------------------+--------------+-------------------------+
|Sam got these marks|[] |[U+1F600 U+1F606 U+1F601]|
|I got good marks |[] | [U+1F604 U+1F643 ] |
+-------------------+---------------+-------------------------+
您可以 transform
数组而不是使用 UDF:
val df2 = df.withColumn(
"unicode",
expr("transform(feeling, x -> 'U+' || ltrim('0' , string(hex(encode(x, 'utf-32')))))")
)
df2.show(false)
+-------------------+------------+---------------------------+
|value |feeling |unicode |
+-------------------+------------+---------------------------+
|Sam got these marks|[, , ]|[U+1F600, U+1F606, U+1F601]|
|I got good marks |[, ] |[U+1F604, U+1F643] |
+-------------------+------------+---------------------------+