如何使用 Scala 在 Spark Streaming 中序列化 org.joda.time.DateTime?

How do I serialize org.joda.time.DateTime in Spark Streaming using Scala?

我创建了一个 DummySource,它从文件中读取行并将其转换为 TaxiRide 对象。问题是有些字段对应于 org.joda.time.DateTime,我使用 org.joda.time.format.{DateTimeFormat, DateTimeFormatter} 并且 SparkStreaming 无法序列化这些字段。

如何让 SparkStreaming 序列化它们?我的代码连同错误一起在下面。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.sense.spark.util.TaxiRideSource

object TaxiRideCountCombineByKey {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("TaxiRideCountCombineByKey")
      .setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val stream = ssc.receiverStream(new TaxiRideSource())
    stream.print()

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }
}

import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import java.util.Locale
import java.util.zip.GZIPInputStream

import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}

case class TaxiRide(rideId: Long, isStart: Boolean, startTime: DateTime, endTime: DateTime,
                    startLon: Float, startLat: Float, endLon: Float, endLat: Float,
                    passengerCnt: Short, taxiId: Long, driverId: Long)

class TaxiRideSource extends Receiver[TaxiRide](StorageLevel.MEMORY_AND_DISK_2) {

  val timeFormatter: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC()
  val dataFilePath = "/home/flink/nycTaxiRides.gz";
  val delayInNanoSeconds: Long = 1000000

  def onStart() {new Thread("TaxiRide Source") {override def run() {receive()}}.start()}

  def onStop() {}

  private def receive() {
    while (!isStopped()) {
      val gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath))
      val reader: BufferedReader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8))
      var line: String = ""
      while (reader.ready() && (line = reader.readLine()) != null) {
        val startTime = System.nanoTime
        // read the line on the file and yield the object
        val taxiRide: TaxiRide = getTaxiRideFromString(line)
        store(Iterator(taxiRide))

        Thread.sleep(1000)
      }
    }
  }

  def getTaxiRideFromString(line: String): TaxiRide = {
    // println(line)
    val tokens: Array[String] = line.split(",")
    if (tokens.length != 11) {
      throw new RuntimeException("Invalid record: " + line)
    }

    val rideId: Long = tokens(0).toLong
    val (isStart, startTime, endTime) = tokens(1) match {
      case "START" => (true, DateTime.parse(tokens(2), timeFormatter), DateTime.parse(tokens(3), timeFormatter))
      case "END" => (false, DateTime.parse(tokens(2), timeFormatter), DateTime.parse(tokens(3), timeFormatter))
      case _ => throw new RuntimeException("Invalid record: " + line)
    }
    val startLon: Float = if (tokens(4).length > 0) tokens(4).toFloat else 0.0f
    val startLat: Float = if (tokens(5).length > 0) tokens(5).toFloat else 0.0f
    val endLon: Float = if (tokens(6).length > 0) tokens(6).toFloat else 0.0f
    val endLat: Float = if (tokens(7).length > 0) tokens(7).toFloat else 0.0f
    val passengerCnt: Short = tokens(8).toShort
    val taxiId: Long = tokens(9).toLong
    val driverId: Long = tokens(10).toLong

    TaxiRide(rideId, isStart, startTime, endTime, startLon, startLat, endLon, endLat, passengerCnt, taxiId, driverId)
  }
}

错误是这样的:

20/06/17 11:27:26 ERROR TaskSetManager: Failed to serialize task 59, not attempting to retry it.
java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter
Serialization stack:
    - object not serializable (class: org.joda.time.format.DateTimeFormatter, value: org.joda.time.format.DateTimeFormatter@19bf2eee)
    - field (class: org.sense.spark.util.TaxiRideSource, name: timeFormatter, type: class org.joda.time.format.DateTimeFormatter)
    - object (class org.sense.spark.util.TaxiRideSource, org.sense.spark.util.TaxiRideSource@2fef7647)
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.streaming.receiver.Receiver;, size 1)
    - field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
    - object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(org.sense.spark.util.TaxiRideSource@2fef7647))
    - writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
    - object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@107f)
    - field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
    - object (class org.apache.spark.scheduler.ResultTask, ResultTask(59, 0))
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer.apply(TaskSetManager.scala:472)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer.apply(TaskSetManager.scala:453)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:453)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet.apply$mcVI$sp(TaskSchedulerImpl.scala:295)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:290)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$$anonfun$apply.apply(TaskSchedulerImpl.scala:375)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$$anonfun$apply.apply(TaskSchedulerImpl.scala:373)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers.apply(TaskSchedulerImpl.scala:373)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers.apply(TaskSchedulerImpl.scala:370)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:370)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive.applyOrElse(LocalSchedulerBackend.scala:64)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/06/17 11:27:26 INFO TaskSchedulerImpl: Removed TaskSet 59.0, whose tasks have all completed, from pool 

据我所知你不能序列化它

最好的选择是将其创建为常量

package abc.xyz
object Constant {
   val timeFormatter: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
      .withLocale(Locale.US).withZoneUTC()
}

在您要使用它的地方添加下面的导入。

import abc.xyz.Constant._