遍历 spark 数据框并将每一行值存储在另一个 class 的变量中

Iterate over a spark dataframe and store each row value in variables of another class

我想遍历 spark 数据框并将每一行的值存储在 类 数据成员(全局变量)中。

代码:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{
  StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

object Main {
  class Input_Class {
    var name: String = "";
    var age: String = "";
    var gender: String = "";

    def setter(src: Row) {
      var row = src.toSeq
      var i = 0;
      name = (row(i)).toString;
      i += 1;
      age = (row(i)).toString;
      i += 1;
      gender = (row(i)).toString;
    }
  }
  class Manager extends Serializable{
    var inputObj = new Input_Class();
    def inputSetter(src: Row) = {
        inputObj.setter(src);
    }
  }

  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("App").config("spark.master", "local").getOrCreate()
    val df = spark.read.csv("data.csv ");
    var ManagerObj = new Manager();
    df.rdd.map(row => {
        ManagerObj.inputSetter(row)
    })
    spark.stop()
  }
}

我不确定这段代码是否正确。我使用地图运算符错了吗?正如错误所说,它不可序列化。请帮助我,我是新手,对此没有太多经验,如果有更好或其他方法来实现我正在做的事情,请推荐。

这是我得到的错误:

20/06/03 17:44:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
org.apache.spark.SparkException: Task not serializable                          
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD.$anonfun$map(RDD.scala:371)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.map(RDD.scala:370)
  at Main$.main(Untitled-2.scala:57)
  ... 51 elided
Caused by: java.io.NotSerializableException: Main$Manager
Serialization stack:
        - object not serializable (class: Main$Manager, value: Main$Manager@108f206f)
        - field (class: scala.runtime.ObjectRef, name: elem, type: class java.lang.Object)
        - object (class scala.runtime.ObjectRef, Main$Manager@108f206f)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Main$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Main$.$anonfun$main$adapted:(Lscala/runtime/ObjectRef;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class Main$$$Lambda51/2090377899, Main$$$Lambda51/2090377899@7722c8e6)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 60 more

谢谢!

您正在闭包中使用 Manager class 实例。请extends Serializable界面在Manager

 class Manager extends Serializable {
    var inputObj = new Input_Class();
    def inputSetter(src: Row) = {
        inputObj.setter(src);
    }
  }