尝试在 Spark Streaming 中使用 Persisted Table 时出现空指针异常
Null Pointer Exception When Trying to Use Persisted Table in Spark Streaming
我在开始创建"gpsLookUpTable"并坚持它,这样我就不需要一遍又一遍地拉它来做映射。但是,当我尝试在 foreach 中访问它时,出现空指针异常。感谢您的帮助。
下面是代码片段:
def main(args: Array[String]): Unit = {
val conf = new SparkConf() ...
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(20))
val sqc = new SQLContext(sc)
//////Trying to cache table here to use it below
val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
//sc.broadcast(gpsLookUpTable)
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
.foreachRDD { rdd =>
if (!rdd.partitions.isEmpty) {
val allRows = sc.textFile("hdfs://localhost:9000/supportFiles/GeoHashLookUpTable")
sqc.read.json(allRows).registerTempTable("GeoHashLookUpTable")
val header = rdd.first().split(",")
val rowsWithoutHeader = Utils.dropHeader(rdd)
rowsWithoutHeader.foreach { row =>
val singleRowArray = row.split(",")
singleRowArray.foreach(println)
(header, singleRowArray).zipped
.foreach { (x, y) =>
///Trying to access persisted table but getting null pointer exception
val selectedRow = gpsLookUpTable
.filter("geoCode LIKE '" + GeoHash.subString(lattitude, longitude) + "%'")
.withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
.orderBy("Distance")
.select("TrackKM", "TrackName").take(1)
if (selectedRow.length != 0) {
// do something
}
else {
// do something
}
}
} }}
我假设你运行在集群中;您的 foreach 将 运行 作为其他节点上的闭包。引发 Nullpointer 是因为闭包 运行s 在没有初始化 gpsLookUpTable 的节点上。您显然确实尝试在
中广播 gpsLookUpTable
//sc.broadcast(gpsLookUpTable)
但是这个需要绑定一个变量,基本上是这样的:
val tableBC = sc.broadcast(gpsLookUpTable)
在 foreach 中,您将替换为:
foreach { (x, y) =>
///Trying to access persisted table but getting null pointer exception
val selectedRow = gpsLookUpTable
有了这个:
foreach { (x, y) =>
///Trying to access persisted table but getting null pointer exception
val selectedRow = tableBC.value
这可以有效地让您访问广播值。
我在开始创建"gpsLookUpTable"并坚持它,这样我就不需要一遍又一遍地拉它来做映射。但是,当我尝试在 foreach 中访问它时,出现空指针异常。感谢您的帮助。
下面是代码片段:
def main(args: Array[String]): Unit = {
val conf = new SparkConf() ...
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(20))
val sqc = new SQLContext(sc)
//////Trying to cache table here to use it below
val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
//sc.broadcast(gpsLookUpTable)
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
.foreachRDD { rdd =>
if (!rdd.partitions.isEmpty) {
val allRows = sc.textFile("hdfs://localhost:9000/supportFiles/GeoHashLookUpTable")
sqc.read.json(allRows).registerTempTable("GeoHashLookUpTable")
val header = rdd.first().split(",")
val rowsWithoutHeader = Utils.dropHeader(rdd)
rowsWithoutHeader.foreach { row =>
val singleRowArray = row.split(",")
singleRowArray.foreach(println)
(header, singleRowArray).zipped
.foreach { (x, y) =>
///Trying to access persisted table but getting null pointer exception
val selectedRow = gpsLookUpTable
.filter("geoCode LIKE '" + GeoHash.subString(lattitude, longitude) + "%'")
.withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
.orderBy("Distance")
.select("TrackKM", "TrackName").take(1)
if (selectedRow.length != 0) {
// do something
}
else {
// do something
}
}
} }}
我假设你运行在集群中;您的 foreach 将 运行 作为其他节点上的闭包。引发 Nullpointer 是因为闭包 运行s 在没有初始化 gpsLookUpTable 的节点上。您显然确实尝试在
中广播 gpsLookUpTable//sc.broadcast(gpsLookUpTable)
但是这个需要绑定一个变量,基本上是这样的:
val tableBC = sc.broadcast(gpsLookUpTable)
在 foreach 中,您将替换为:
foreach { (x, y) =>
///Trying to access persisted table but getting null pointer exception
val selectedRow = gpsLookUpTable
有了这个:
foreach { (x, y) =>
///Trying to access persisted table but getting null pointer exception
val selectedRow = tableBC.value
这可以有效地让您访问广播值。