遍历 spark 数据框并将每一行值存储在另一个 class 的变量中
Iterate over a spark dataframe and store each row value in variables of another class
我想遍历 spark 数据框并将每一行的值存储在 类 数据成员(全局变量)中。
代码:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{
StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row
object Main {
class Input_Class {
var name: String = "";
var age: String = "";
var gender: String = "";
def setter(src: Row) {
var row = src.toSeq
var i = 0;
name = (row(i)).toString;
i += 1;
age = (row(i)).toString;
i += 1;
gender = (row(i)).toString;
}
}
class Manager extends Serializable{
var inputObj = new Input_Class();
def inputSetter(src: Row) = {
inputObj.setter(src);
}
}
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("App").config("spark.master", "local").getOrCreate()
val df = spark.read.csv("data.csv ");
var ManagerObj = new Manager();
df.rdd.map(row => {
ManagerObj.inputSetter(row)
})
spark.stop()
}
}
我不确定这段代码是否正确。我使用地图运算符错了吗?正如错误所说,它不可序列化。请帮助我,我是新手,对此没有太多经验,如果有更好或其他方法来实现我正在做的事情,请推荐。
这是我得到的错误:
20/06/03 17:44:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$map(RDD.scala:371)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
at Main$.main(Untitled-2.scala:57)
... 51 elided
Caused by: java.io.NotSerializableException: Main$Manager
Serialization stack:
- object not serializable (class: Main$Manager, value: Main$Manager@108f206f)
- field (class: scala.runtime.ObjectRef, name: elem, type: class java.lang.Object)
- object (class scala.runtime.ObjectRef, Main$Manager@108f206f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Main$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Main$.$anonfun$main$adapted:(Lscala/runtime/ObjectRef;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class Main$$$Lambda51/2090377899, Main$$$Lambda51/2090377899@7722c8e6)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 60 more
谢谢!
您正在闭包中使用 Manager
class 实例。请extends Serializable
界面在Manager
class Manager extends Serializable {
var inputObj = new Input_Class();
def inputSetter(src: Row) = {
inputObj.setter(src);
}
}
我想遍历 spark 数据框并将每一行的值存储在 类 数据成员(全局变量)中。
代码:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{
StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row
object Main {
class Input_Class {
var name: String = "";
var age: String = "";
var gender: String = "";
def setter(src: Row) {
var row = src.toSeq
var i = 0;
name = (row(i)).toString;
i += 1;
age = (row(i)).toString;
i += 1;
gender = (row(i)).toString;
}
}
class Manager extends Serializable{
var inputObj = new Input_Class();
def inputSetter(src: Row) = {
inputObj.setter(src);
}
}
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("App").config("spark.master", "local").getOrCreate()
val df = spark.read.csv("data.csv ");
var ManagerObj = new Manager();
df.rdd.map(row => {
ManagerObj.inputSetter(row)
})
spark.stop()
}
}
我不确定这段代码是否正确。我使用地图运算符错了吗?正如错误所说,它不可序列化。请帮助我,我是新手,对此没有太多经验,如果有更好或其他方法来实现我正在做的事情,请推荐。
这是我得到的错误:
20/06/03 17:44:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$map(RDD.scala:371)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
at Main$.main(Untitled-2.scala:57)
... 51 elided
Caused by: java.io.NotSerializableException: Main$Manager
Serialization stack:
- object not serializable (class: Main$Manager, value: Main$Manager@108f206f)
- field (class: scala.runtime.ObjectRef, name: elem, type: class java.lang.Object)
- object (class scala.runtime.ObjectRef, Main$Manager@108f206f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Main$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Main$.$anonfun$main$adapted:(Lscala/runtime/ObjectRef;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class Main$$$Lambda51/2090377899, Main$$$Lambda51/2090377899@7722c8e6)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 60 more
谢谢!
您正在闭包中使用 Manager
class 实例。请extends Serializable
界面在Manager
class Manager extends Serializable {
var inputObj = new Input_Class();
def inputSetter(src: Row) = {
inputObj.setter(src);
}
}