Spark 无法序列化 BufferedImage class
Spark can not serialize the BufferedImage class
我在 Spark 2.2.0 中有一个 Not Serializable Class 异常。
以下过程是我在 Scala 中尝试做的:
- 从 HDFS 读取一组 JPEG 图片。
- 构建
java.awt.image.BufferedImageS
数组。
- 提取
java.awt.image.BufferedImage
缓冲区并将其存储在每个图像的二维数组中,方法是构建包含图像缓冲区信息的二维数组数组Array[Array[Int]]
。
- 使用
sc.parallelize
方法将 Array[Array[Int]]
转换为 org.apache.spark.rdd.RDD[Array[Array[Int]]]
。
- 通过变换初始
org.apache.spark.rdd.RDD[Array[Array[Int]]]
. 来分布式执行图像处理操作
这是代码:
import org.apache.spark.sql.SparkSession
import javax.imageio.ImageIO
import java.io.ByteArrayInputStream
def binarize(image: Array[Array[Int]], threshold: Int) : Array[Array[Int]] = {
val height = image.size
val width = image(0).size
val result = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
result(i)(j) = if (image(i)(j) <= threshold) 0 else 255
}
}
result
}
object imageTestObj {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
val sc = spark.sparkContext
val saveToHDFS = false
val threshold: Int = 128
val partitions = 32
val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"
val files = sc.binaryFiles(inPathStr).collect
val AWTImageArray = files.map { binFile =>
val input = binFile._2.open()
val name = binFile._1
var buffer: Array[Byte] = Array.fill(input.available)(0)
input.readFully(buffer)
ImageIO.read(new ByteArrayInputStream(buffer))
}
val ImgBuffers = AWTImageArray.map { image =>
val height = image.getHeight
val width = image.getWidth
val buffer = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
}
}
buffer
}
val inputImages = sc.parallelize(ImgBuffers, partitions).cache()
val op1 = inputImages.map(image => binarize(image, threshold))
}
}
这个算法有一个非常著名的异常:
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: java.awt.image.BufferedImage
Serialization stack:
- object not serializable (class: java.awt.image.BufferedImage, ...
我不明白为什么 Spark 在应用程序中创建第一个 RDD
之前尝试序列化 BufferedImage
class。如果我尝试创建一个 RDD[BufferedImage]
,难道 BufferedImage
class 不应该被序列化吗?
谁能给我解释一下这是怎么回事?
提前谢谢你...
实际上您正在序列化 Spark 中的一个函数。此函数不能包含对不可序列化 类 的引用。您可以在函数中实例化不可序列化 类(OK),但不能在函数中引用不可序列化 类 的实例。
很可能您正在引用您用于 BufferedImage 实例的函数之一。
检查您的代码,看看您是否没有从函数引用 BufferedImage 对象。
通过内联一些代码而不序列化 BufferedImage 对象,我想你可以克服异常。你能试试这段代码吗(我自己没有执行)?:
object imageTestObj {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
val sc = spark.sparkContext
val saveToHDFS = false
val threshold: Int = 128
val partitions = 32
val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"
val ImgBuffers = sc.binaryFiles(inPathStr).collect.map { binFile =>
val input = binFile._2.open()
val name = binFile._1
var buffer: Array[Byte] = Array.fill(input.available)(0)
input.readFully(buffer)
val image = ImageIO.read(new ByteArrayInputStream(buffer))
// Inlining must be here, so that BufferedImage is not serialized.
val height = image.getHeight
val width = image.getWidth
val buffer = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
}
}
buffer
}
val inputImages = sc.parallelize(ImgBuffers, partitions).cache()
val op1 = inputImages.map(image => binarize(image, threshold))
}
}
我在 Spark 2.2.0 中有一个 Not Serializable Class 异常。 以下过程是我在 Scala 中尝试做的:
- 从 HDFS 读取一组 JPEG 图片。
- 构建
java.awt.image.BufferedImageS
数组。 - 提取
java.awt.image.BufferedImage
缓冲区并将其存储在每个图像的二维数组中,方法是构建包含图像缓冲区信息的二维数组数组Array[Array[Int]]
。 - 使用
sc.parallelize
方法将Array[Array[Int]]
转换为org.apache.spark.rdd.RDD[Array[Array[Int]]]
。 - 通过变换初始
org.apache.spark.rdd.RDD[Array[Array[Int]]]
. 来分布式执行图像处理操作
这是代码:
import org.apache.spark.sql.SparkSession
import javax.imageio.ImageIO
import java.io.ByteArrayInputStream
def binarize(image: Array[Array[Int]], threshold: Int) : Array[Array[Int]] = {
val height = image.size
val width = image(0).size
val result = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
result(i)(j) = if (image(i)(j) <= threshold) 0 else 255
}
}
result
}
object imageTestObj {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
val sc = spark.sparkContext
val saveToHDFS = false
val threshold: Int = 128
val partitions = 32
val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"
val files = sc.binaryFiles(inPathStr).collect
val AWTImageArray = files.map { binFile =>
val input = binFile._2.open()
val name = binFile._1
var buffer: Array[Byte] = Array.fill(input.available)(0)
input.readFully(buffer)
ImageIO.read(new ByteArrayInputStream(buffer))
}
val ImgBuffers = AWTImageArray.map { image =>
val height = image.getHeight
val width = image.getWidth
val buffer = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
}
}
buffer
}
val inputImages = sc.parallelize(ImgBuffers, partitions).cache()
val op1 = inputImages.map(image => binarize(image, threshold))
}
}
这个算法有一个非常著名的异常:
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: java.awt.image.BufferedImage
Serialization stack:
- object not serializable (class: java.awt.image.BufferedImage, ...
我不明白为什么 Spark 在应用程序中创建第一个 RDD
之前尝试序列化 BufferedImage
class。如果我尝试创建一个 RDD[BufferedImage]
,难道 BufferedImage
class 不应该被序列化吗?
谁能给我解释一下这是怎么回事?
提前谢谢你...
实际上您正在序列化 Spark 中的一个函数。此函数不能包含对不可序列化 类 的引用。您可以在函数中实例化不可序列化 类(OK),但不能在函数中引用不可序列化 类 的实例。
很可能您正在引用您用于 BufferedImage 实例的函数之一。
检查您的代码,看看您是否没有从函数引用 BufferedImage 对象。
通过内联一些代码而不序列化 BufferedImage 对象,我想你可以克服异常。你能试试这段代码吗(我自己没有执行)?:
object imageTestObj {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
val sc = spark.sparkContext
val saveToHDFS = false
val threshold: Int = 128
val partitions = 32
val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"
val ImgBuffers = sc.binaryFiles(inPathStr).collect.map { binFile =>
val input = binFile._2.open()
val name = binFile._1
var buffer: Array[Byte] = Array.fill(input.available)(0)
input.readFully(buffer)
val image = ImageIO.read(new ByteArrayInputStream(buffer))
// Inlining must be here, so that BufferedImage is not serialized.
val height = image.getHeight
val width = image.getWidth
val buffer = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
}
}
buffer
}
val inputImages = sc.parallelize(ImgBuffers, partitions).cache()
val op1 = inputImages.map(image => binarize(image, threshold))
}
}