Spark 无法序列化 BufferedImage class

Spark can not serialize the BufferedImage class

我在 Spark 2.2.0 中有一个 Not Serializable Class 异常。 以下过程是我在 Scala 中尝试做的:

  1. 从 HDFS 读取一组 JPEG 图片。
  2. 构建java.awt.image.BufferedImageS数组。
  3. 提取java.awt.image.BufferedImage缓冲区并将其存储在每个图像的二维数组中,方法是构建包含图像缓冲区信息的二维数组数组Array[Array[Int]]
  4. 使用 sc.parallelize 方法将 Array[Array[Int]] 转换为 org.apache.spark.rdd.RDD[Array[Array[Int]]]
  5. 通过变换初始 org.apache.spark.rdd.RDD[Array[Array[Int]]].
  6. 来分布式执行图像处理操作

这是代码:

import org.apache.spark.sql.SparkSession
import javax.imageio.ImageIO
import java.io.ByteArrayInputStream

def binarize(image: Array[Array[Int]], threshold: Int) : Array[Array[Int]] = {
    val height = image.size
    val width = image(0).size
    val result = Array.ofDim[Int](height, width)
    for (i <- 0 until height) {
        for (j <- 0 until width){
            result(i)(j) = if (image(i)(j) <= threshold)  0 else 255
        }
    }
    result
}

object imageTestObj {
    def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
        val sc = spark.sparkContext
        val saveToHDFS = false
        val threshold: Int = 128
        val partitions = 32
        val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
        val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"

        val files = sc.binaryFiles(inPathStr).collect

        val AWTImageArray = files.map { binFile =>
            val input = binFile._2.open()
            val name = binFile._1
            var buffer: Array[Byte] = Array.fill(input.available)(0)
            input.readFully(buffer)
            ImageIO.read(new ByteArrayInputStream(buffer))
        }

        val ImgBuffers = AWTImageArray.map { image =>
            val height = image.getHeight
            val width = image.getWidth
            val buffer = Array.ofDim[Int](height, width)
            for (i <- 0 until height) {
                for (j <- 0 until width){
                    buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
                }
            }
            buffer
        }

        val inputImages = sc.parallelize(ImgBuffers, partitions).cache()

        val op1 = inputImages.map(image => binarize(image, threshold))
    }
}

这个算法有一个非常著名的异常:

org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: java.awt.image.BufferedImage
Serialization stack:
- object not serializable (class: java.awt.image.BufferedImage, ...

我不明白为什么 Spark 在应用程序中创建第一个 RDD 之前尝试序列化 BufferedImage class。如果我尝试创建一个 RDD[BufferedImage],难道 BufferedImage class 不应该被序列化吗?

谁能给我解释一下这是怎么回事?

提前谢谢你...

实际上您正在序列化 Spark 中的一个函数。此函数不能包含对不可序列化 类 的引用。您可以在函数中实例化不可序列化 类(OK),但不能在函数中引用不可序列化 类 的实例。

很可能您正在引用您用于 BufferedImage 实例的函数之一。

检查您的代码,看看您是否没有从函数引用 BufferedImage 对象。

通过内联一些代码而不序列化 BufferedImage 对象,我想你可以克服异常。你能试试这段代码吗(我自己没有执行)?:

object imageTestObj {
  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
    val sc = spark.sparkContext
    val saveToHDFS = false
    val threshold: Int = 128
    val partitions = 32
    val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
    val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"

    val ImgBuffers = sc.binaryFiles(inPathStr).collect.map { binFile =>

      val input = binFile._2.open()
      val name = binFile._1
      var buffer: Array[Byte] = Array.fill(input.available)(0)
      input.readFully(buffer)
      val image = ImageIO.read(new ByteArrayInputStream(buffer))
      // Inlining must be here, so that BufferedImage is not serialized.
      val height = image.getHeight
      val width = image.getWidth
      val buffer = Array.ofDim[Int](height, width)
      for (i <- 0 until height) {
        for (j <- 0 until width){
          buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
        }
      }
      buffer
    }

    val inputImages = sc.parallelize(ImgBuffers, partitions).cache()

    val op1 = inputImages.map(image => binarize(image, threshold))
  }
}