Iterating Through Spark DataFrame Takes Huge Time and Fails with Error OutOfMemoryError: GC overhead limit exceeded

Iterating Through Spark DataFrame Takes Huge Time and Fails with Error OutOfMemoryError: GC overhead limit exceeded

我正在处理 XML 中的 500 万条记录。我将它们加载到 Spark Dataframe 中,然后尝试使用 dataframe foreach 方法将它们加载到 HBase 中。在 foreach 本身的处理时间很少或加载速度极慢之后,我遇到了内存不足错误。谁能提出任何解决方案或更好的方法?

代码

val xmlSchemaXML = StructType(Array(
    StructField("A", StringType, nullable = true),
    StructField("B", StringType, nullable = true),
    StructField("C", StringType, nullable = true),
    StructField("D", StringType, nullable = true))
  )

  //Get File In DataFrame
  var dfXML = sqlContext.read.format("com.databricks.spark.xml")
    .option("rootTag", "ABC")
    .option("rowTag", "AB")
    .schema(xmlSchemaXML)
    .load("file:///home/xyz.xml")
    .withColumn("as_of_date", current_date())
    .withColumn("last_updated_date", current_timestamp())


  //Create HBase Configuration
  val hBaseConf = HBaseConfiguration.create()

  //Set HBase Configurations
  hBaseConf.set("hadoop.security.authentication", "kerberos")
  hBaseConf.set("hbase.zookeeper.quorum", cluster)
  hBaseConf.set("hbase.zookeeper.property.client.port", "2181")

  //Login Using KeyTab
  UserGroupInformation.setConfiguration(hBaseConf)
  UserGroupInformation.loginUserFromKeytab("user", "file:///tmp/keytab.keytab")

  println("Creating Connection With HBase...")


  val hBaseAdmin = new HBaseAdmin(hBaseConf)

  /** *************Check if Table Already Exists or Create One ***************/

  if (!hBaseAdmin.isTableAvailable("ns:table_name")) {
    println("ns:table_name does not exist...")
    val tableDescriptor = new HTableDescriptor(TableD.valueOf("ns:table_name"))
    val columnDescriptor = new HColumnDescriptor(Bytes.toBytes("cf"))
    columnDescriptor.setVersions(1, 15)

    try {
      tableDescriptor.addFamily(columnDescriptor)
      hBaseAdmin.createTable(tableDescriptor)
      println("ns:table_name created...")
    }
    catch {
      case _: Throwable => println("table creation failed...")
    }
  }


  /** **************GET RECORD COUNT FROM ns:table_name ****************/
  var rowKeyCount: Long = 0
  try {
    hBaseConf.set(TableInputFormat.INPUT_TABLE, "ns:table_name")
    val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found in ns:table_name: " + hBaseRDD.count())
    rowKeyCount = hBaseRDD.count()
    println("====================================================================================")
  }
  catch {
    case _: Throwable => println("table reading failed...")
    case npe: NullPointerException =>
      println("Result NullPointerException: Table does not exist")
      rowKeyCount = 0
  }

  println("----------------START READING DATA FROM DATAFRAME AND LOAD TO HBASE----------------")

  //Create HTable for ns:table_name
  val hTable = new HTable(hBaseConf, "ns:table_name")

  println("Total Rows in File: " + dfXML.count())


  var A = ""
  var B = ""
  var C = ""
  var D = ""
  var as_of_date = ""
  var last_updated_date = ""

  dfXML.limit(100000).collect().foreach(f = elem => {

    //println(elem)
    rowKeyCount = rowKeyCount + 1

    //println("0")
     if (elem.getString(0) == null)
       A = ""
     else
    A = elem.getString(0)

    //println("1")
    if (elem.getString(1) == null)
      B = ""
    else
    B = elem.getString(1)


    //println("2")
    if (elem.getString(2) == null)
      C = ""
    else
    C = elem.getString(2)

    //println("3")
    if (elem.getString(3) == null)
      D = ""
    else
    D = elem.getString(3)

    //println("4")        
    as_of_date = elem.getDate(4).toString

    //println("5")        
    last_updated_date = elem.getTimestamp(5).toString


    var put = new Put(rowKeyCount.toString.getBytes()); //Store RowKey

    put.addColumn("cf".getBytes(), "A".getBytes(), A.getBytes())
    put.addColumn("cf".getBytes(), "B".getBytes(), B.getBytes())
    put.addColumn("cf".getBytes(), "C".getBytes(), C.getBytes())
    put.addColumn("cf".getBytes(), "D".getBytes(), D.getBytes())
    put.addColumn("cf".getBytes(), "as_of_date".getBytes(), as_of_date.getBytes())
    put.addColumn("cf".getBytes(), "last_updated_date".getBytes(), last_updated_date.getBytes())

    //Commit to HBaseDB        
    hTable.put(put);
    //println(rowKeyCount + " : Record written to HBase...")

  })

  hTable.flushCommits();

您需要做的是将默认的 100 个分区增加到更适合您的工作负载的程度。请从 df.repartition(1000). foreachPartition(... 开始,然后查看 1000 是太多还是太少。

500 万条记录似乎不是一个大数目,很可能是您有大量记录或没有足够的堆 space 分配给执行程序。