Iterating Through Spark DataFrame Takes Huge Time and Fails with Error OutOfMemoryError: GC overhead limit exceeded
Iterating Through Spark DataFrame Takes Huge Time and Fails with Error OutOfMemoryError: GC overhead limit exceeded
我正在处理 XML 中的 500 万条记录。我将它们加载到 Spark Dataframe 中,然后尝试使用 dataframe foreach 方法将它们加载到 HBase 中。在 foreach 本身的处理时间很少或加载速度极慢之后,我遇到了内存不足错误。谁能提出任何解决方案或更好的方法?
代码:
val xmlSchemaXML = StructType(Array(
StructField("A", StringType, nullable = true),
StructField("B", StringType, nullable = true),
StructField("C", StringType, nullable = true),
StructField("D", StringType, nullable = true))
)
//Get File In DataFrame
var dfXML = sqlContext.read.format("com.databricks.spark.xml")
.option("rootTag", "ABC")
.option("rowTag", "AB")
.schema(xmlSchemaXML)
.load("file:///home/xyz.xml")
.withColumn("as_of_date", current_date())
.withColumn("last_updated_date", current_timestamp())
//Create HBase Configuration
val hBaseConf = HBaseConfiguration.create()
//Set HBase Configurations
hBaseConf.set("hadoop.security.authentication", "kerberos")
hBaseConf.set("hbase.zookeeper.quorum", cluster)
hBaseConf.set("hbase.zookeeper.property.client.port", "2181")
//Login Using KeyTab
UserGroupInformation.setConfiguration(hBaseConf)
UserGroupInformation.loginUserFromKeytab("user", "file:///tmp/keytab.keytab")
println("Creating Connection With HBase...")
val hBaseAdmin = new HBaseAdmin(hBaseConf)
/** *************Check if Table Already Exists or Create One ***************/
if (!hBaseAdmin.isTableAvailable("ns:table_name")) {
println("ns:table_name does not exist...")
val tableDescriptor = new HTableDescriptor(TableD.valueOf("ns:table_name"))
val columnDescriptor = new HColumnDescriptor(Bytes.toBytes("cf"))
columnDescriptor.setVersions(1, 15)
try {
tableDescriptor.addFamily(columnDescriptor)
hBaseAdmin.createTable(tableDescriptor)
println("ns:table_name created...")
}
catch {
case _: Throwable => println("table creation failed...")
}
}
/** **************GET RECORD COUNT FROM ns:table_name ****************/
var rowKeyCount: Long = 0
try {
hBaseConf.set(TableInputFormat.INPUT_TABLE, "ns:table_name")
val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("Number of Records found in ns:table_name: " + hBaseRDD.count())
rowKeyCount = hBaseRDD.count()
println("====================================================================================")
}
catch {
case _: Throwable => println("table reading failed...")
case npe: NullPointerException =>
println("Result NullPointerException: Table does not exist")
rowKeyCount = 0
}
println("----------------START READING DATA FROM DATAFRAME AND LOAD TO HBASE----------------")
//Create HTable for ns:table_name
val hTable = new HTable(hBaseConf, "ns:table_name")
println("Total Rows in File: " + dfXML.count())
var A = ""
var B = ""
var C = ""
var D = ""
var as_of_date = ""
var last_updated_date = ""
dfXML.limit(100000).collect().foreach(f = elem => {
//println(elem)
rowKeyCount = rowKeyCount + 1
//println("0")
if (elem.getString(0) == null)
A = ""
else
A = elem.getString(0)
//println("1")
if (elem.getString(1) == null)
B = ""
else
B = elem.getString(1)
//println("2")
if (elem.getString(2) == null)
C = ""
else
C = elem.getString(2)
//println("3")
if (elem.getString(3) == null)
D = ""
else
D = elem.getString(3)
//println("4")
as_of_date = elem.getDate(4).toString
//println("5")
last_updated_date = elem.getTimestamp(5).toString
var put = new Put(rowKeyCount.toString.getBytes()); //Store RowKey
put.addColumn("cf".getBytes(), "A".getBytes(), A.getBytes())
put.addColumn("cf".getBytes(), "B".getBytes(), B.getBytes())
put.addColumn("cf".getBytes(), "C".getBytes(), C.getBytes())
put.addColumn("cf".getBytes(), "D".getBytes(), D.getBytes())
put.addColumn("cf".getBytes(), "as_of_date".getBytes(), as_of_date.getBytes())
put.addColumn("cf".getBytes(), "last_updated_date".getBytes(), last_updated_date.getBytes())
//Commit to HBaseDB
hTable.put(put);
//println(rowKeyCount + " : Record written to HBase...")
})
hTable.flushCommits();
您需要做的是将默认的 100 个分区增加到更适合您的工作负载的程度。请从 df.repartition(1000). foreachPartition(...
开始,然后查看 1000 是太多还是太少。
500 万条记录似乎不是一个大数目,很可能是您有大量记录或没有足够的堆 space 分配给执行程序。
我正在处理 XML 中的 500 万条记录。我将它们加载到 Spark Dataframe 中,然后尝试使用 dataframe foreach 方法将它们加载到 HBase 中。在 foreach 本身的处理时间很少或加载速度极慢之后,我遇到了内存不足错误。谁能提出任何解决方案或更好的方法?
代码:
val xmlSchemaXML = StructType(Array(
StructField("A", StringType, nullable = true),
StructField("B", StringType, nullable = true),
StructField("C", StringType, nullable = true),
StructField("D", StringType, nullable = true))
)
//Get File In DataFrame
var dfXML = sqlContext.read.format("com.databricks.spark.xml")
.option("rootTag", "ABC")
.option("rowTag", "AB")
.schema(xmlSchemaXML)
.load("file:///home/xyz.xml")
.withColumn("as_of_date", current_date())
.withColumn("last_updated_date", current_timestamp())
//Create HBase Configuration
val hBaseConf = HBaseConfiguration.create()
//Set HBase Configurations
hBaseConf.set("hadoop.security.authentication", "kerberos")
hBaseConf.set("hbase.zookeeper.quorum", cluster)
hBaseConf.set("hbase.zookeeper.property.client.port", "2181")
//Login Using KeyTab
UserGroupInformation.setConfiguration(hBaseConf)
UserGroupInformation.loginUserFromKeytab("user", "file:///tmp/keytab.keytab")
println("Creating Connection With HBase...")
val hBaseAdmin = new HBaseAdmin(hBaseConf)
/** *************Check if Table Already Exists or Create One ***************/
if (!hBaseAdmin.isTableAvailable("ns:table_name")) {
println("ns:table_name does not exist...")
val tableDescriptor = new HTableDescriptor(TableD.valueOf("ns:table_name"))
val columnDescriptor = new HColumnDescriptor(Bytes.toBytes("cf"))
columnDescriptor.setVersions(1, 15)
try {
tableDescriptor.addFamily(columnDescriptor)
hBaseAdmin.createTable(tableDescriptor)
println("ns:table_name created...")
}
catch {
case _: Throwable => println("table creation failed...")
}
}
/** **************GET RECORD COUNT FROM ns:table_name ****************/
var rowKeyCount: Long = 0
try {
hBaseConf.set(TableInputFormat.INPUT_TABLE, "ns:table_name")
val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("Number of Records found in ns:table_name: " + hBaseRDD.count())
rowKeyCount = hBaseRDD.count()
println("====================================================================================")
}
catch {
case _: Throwable => println("table reading failed...")
case npe: NullPointerException =>
println("Result NullPointerException: Table does not exist")
rowKeyCount = 0
}
println("----------------START READING DATA FROM DATAFRAME AND LOAD TO HBASE----------------")
//Create HTable for ns:table_name
val hTable = new HTable(hBaseConf, "ns:table_name")
println("Total Rows in File: " + dfXML.count())
var A = ""
var B = ""
var C = ""
var D = ""
var as_of_date = ""
var last_updated_date = ""
dfXML.limit(100000).collect().foreach(f = elem => {
//println(elem)
rowKeyCount = rowKeyCount + 1
//println("0")
if (elem.getString(0) == null)
A = ""
else
A = elem.getString(0)
//println("1")
if (elem.getString(1) == null)
B = ""
else
B = elem.getString(1)
//println("2")
if (elem.getString(2) == null)
C = ""
else
C = elem.getString(2)
//println("3")
if (elem.getString(3) == null)
D = ""
else
D = elem.getString(3)
//println("4")
as_of_date = elem.getDate(4).toString
//println("5")
last_updated_date = elem.getTimestamp(5).toString
var put = new Put(rowKeyCount.toString.getBytes()); //Store RowKey
put.addColumn("cf".getBytes(), "A".getBytes(), A.getBytes())
put.addColumn("cf".getBytes(), "B".getBytes(), B.getBytes())
put.addColumn("cf".getBytes(), "C".getBytes(), C.getBytes())
put.addColumn("cf".getBytes(), "D".getBytes(), D.getBytes())
put.addColumn("cf".getBytes(), "as_of_date".getBytes(), as_of_date.getBytes())
put.addColumn("cf".getBytes(), "last_updated_date".getBytes(), last_updated_date.getBytes())
//Commit to HBaseDB
hTable.put(put);
//println(rowKeyCount + " : Record written to HBase...")
})
hTable.flushCommits();
您需要做的是将默认的 100 个分区增加到更适合您的工作负载的程度。请从 df.repartition(1000). foreachPartition(...
开始,然后查看 1000 是太多还是太少。
500 万条记录似乎不是一个大数目,很可能是您有大量记录或没有足够的堆 space 分配给执行程序。