火花流 "ERROR JobScheduler: error in job generator"

Spark Streaming "ERROR JobScheduler: error in job generator"

我构建了一个 spark Streaming 应用程序来持续接收来自 Kafka 的消息,然后将它们写入 table HBase。

此应用在前 25 分钟内运行良好。当我在 Kafka-console-producer 中输入 1;name12;name2 等 KV 对时,它们能够保存在 Hbase Table:

ROW       COLUMN+CELL

 1        column=cf1:column-Name, timestamp=1471905340560, value=name1

 2        column=cf1:column-Name, timestamp=1471905348165, value=name2

但是大约 25 分钟后,我的应用停止并出现错误 ERROR JobSchedular: ERROR in job generator。此错误的详细信息如下所示:

16/08/29 18:01:10 ERROR JobScheduler: Error in job generator
java.lang.IllegalArgumentException: requirement failed
        at scala.Predef$.require(Predef.scala:221)
        at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
        at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
        at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221)
        at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
        at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
        at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
16/08/29 18:01:10 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
16/08/29 18:01:10 INFO JobGenerator: Stopping JobGenerator immediately

它在前 25 分钟内运行良好,但之后由于某种我不知道的原因,作业生成器似乎突然无法正确实例化。

我的代码如下:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import org.apache.hadoop.io.{LongWritable, Writable, IntWritable, Text}
import org.apache.hadoop.mapreduce.Job

object ReceiveKafkaAsDstream {
  case class SampleKafkaRecord(id: String, name: String)
  object SampleKafkaRecord extends Serializable {
    def parseToSampleRecord(line: String): SampleKafkaRecord = {
      val values = line.split(";")
      SampleKafkaRecord(values(0), values(1))
    }

    def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = {
      val rowKey = CSVData.id
      val putOnce = new Put(rowKey.getBytes)

      putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes)
      return (new ImmutableBytesWritable(rowKey.getBytes), putOnce)
    }
  }


  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val topics = "test"
    val brokers = "10.0.2.15:6667"

    val topicSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
        "zookeeper.connection.timeout.ms" -> "1000")

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

    val tableName = "KafkaTable"
    val conf = HBaseConfiguration.create()
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set("hbase.zookeeper.property.clientPort", "2181")

    val job = Job.getInstance(conf)
    job.setOutputKeyClass(classOf[Text])
    job.setOutputValueClass(classOf[Text])
    job.setOutputFormatClass(classOf[TableOutputFormat[Text]])

    val records = messages
      .map(_._2)
      .map(SampleKafkaRecord.parseToSampleRecord)

    records
      .foreachRDD{ rdd => {
        rdd.map(SampleKafkaRecord.SampleToHbasePut).saveAsNewAPIHadoopDataset(job.getConfiguration) }
      }
    records.print()  

    ssc.start()
    ssc.awaitTermination()
  }
}

感觉是配置问题。任何帮助表示赞赏。

我添加了一个名为 zookeeper.session.timeout.ms 的 属性 通过添加代码:

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
    "zookeeper.connect" -> xxxxxx:2181",
    "zookeeper.connection.timeout.ms" -> "10000",
    "zookeeper.session.timeout.ms" -> "10000")

并将火花流的间隔设置为10秒。 通过这样做,我的 spark streaming 应用程序可以保持 运行 很长时间。

但是我检查内存的时候,还是一直在减少,不知道怎么解决。

很有可能clock synchronization problem。尝试启用 NTP 以确保所有集群节点同步到同一时间。