火花流 "ERROR JobScheduler: error in job generator"
Spark Streaming "ERROR JobScheduler: error in job generator"
我构建了一个 spark Streaming 应用程序来持续接收来自 Kafka 的消息,然后将它们写入 table HBase。
此应用在前 25 分钟内运行良好。当我在 Kafka-console-producer 中输入 1;name1
、2;name2
等 KV 对时,它们能够保存在 Hbase Table:
中
ROW COLUMN+CELL
1 column=cf1:column-Name, timestamp=1471905340560, value=name1
2 column=cf1:column-Name, timestamp=1471905348165, value=name2
但是大约 25 分钟后,我的应用停止并出现错误 ERROR JobSchedular: ERROR in job generator
。此错误的详细信息如下所示:
16/08/29 18:01:10 ERROR JobScheduler: Error in job generator
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
16/08/29 18:01:10 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
16/08/29 18:01:10 INFO JobGenerator: Stopping JobGenerator immediately
它在前 25 分钟内运行良好,但之后由于某种我不知道的原因,作业生成器似乎突然无法正确实例化。
我的代码如下:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import org.apache.hadoop.io.{LongWritable, Writable, IntWritable, Text}
import org.apache.hadoop.mapreduce.Job
object ReceiveKafkaAsDstream {
case class SampleKafkaRecord(id: String, name: String)
object SampleKafkaRecord extends Serializable {
def parseToSampleRecord(line: String): SampleKafkaRecord = {
val values = line.split(";")
SampleKafkaRecord(values(0), values(1))
}
def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = {
val rowKey = CSVData.id
val putOnce = new Put(rowKey.getBytes)
putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes)
return (new ImmutableBytesWritable(rowKey.getBytes), putOnce)
}
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topics = "test"
val brokers = "10.0.2.15:6667"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"zookeeper.connection.timeout.ms" -> "1000")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val tableName = "KafkaTable"
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set("hbase.zookeeper.property.clientPort", "2181")
val job = Job.getInstance(conf)
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[Text])
job.setOutputFormatClass(classOf[TableOutputFormat[Text]])
val records = messages
.map(_._2)
.map(SampleKafkaRecord.parseToSampleRecord)
records
.foreachRDD{ rdd => {
rdd.map(SampleKafkaRecord.SampleToHbasePut).saveAsNewAPIHadoopDataset(job.getConfiguration) }
}
records.print()
ssc.start()
ssc.awaitTermination()
}
}
感觉是配置问题。任何帮助表示赞赏。
我添加了一个名为 zookeeper.session.timeout.ms 的 属性
通过添加代码:
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"zookeeper.connect" -> xxxxxx:2181",
"zookeeper.connection.timeout.ms" -> "10000",
"zookeeper.session.timeout.ms" -> "10000")
并将火花流的间隔设置为10秒。
通过这样做,我的 spark streaming 应用程序可以保持 运行 很长时间。
但是我检查内存的时候,还是一直在减少,不知道怎么解决。
很有可能clock synchronization problem。尝试启用 NTP 以确保所有集群节点同步到同一时间。
我构建了一个 spark Streaming 应用程序来持续接收来自 Kafka 的消息,然后将它们写入 table HBase。
此应用在前 25 分钟内运行良好。当我在 Kafka-console-producer 中输入 1;name1
、2;name2
等 KV 对时,它们能够保存在 Hbase Table:
ROW COLUMN+CELL
1 column=cf1:column-Name, timestamp=1471905340560, value=name1
2 column=cf1:column-Name, timestamp=1471905348165, value=name2
但是大约 25 分钟后,我的应用停止并出现错误 ERROR JobSchedular: ERROR in job generator
。此错误的详细信息如下所示:
16/08/29 18:01:10 ERROR JobScheduler: Error in job generator
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
16/08/29 18:01:10 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
16/08/29 18:01:10 INFO JobGenerator: Stopping JobGenerator immediately
它在前 25 分钟内运行良好,但之后由于某种我不知道的原因,作业生成器似乎突然无法正确实例化。
我的代码如下:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import org.apache.hadoop.io.{LongWritable, Writable, IntWritable, Text}
import org.apache.hadoop.mapreduce.Job
object ReceiveKafkaAsDstream {
case class SampleKafkaRecord(id: String, name: String)
object SampleKafkaRecord extends Serializable {
def parseToSampleRecord(line: String): SampleKafkaRecord = {
val values = line.split(";")
SampleKafkaRecord(values(0), values(1))
}
def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = {
val rowKey = CSVData.id
val putOnce = new Put(rowKey.getBytes)
putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes)
return (new ImmutableBytesWritable(rowKey.getBytes), putOnce)
}
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topics = "test"
val brokers = "10.0.2.15:6667"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"zookeeper.connection.timeout.ms" -> "1000")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val tableName = "KafkaTable"
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set("hbase.zookeeper.property.clientPort", "2181")
val job = Job.getInstance(conf)
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[Text])
job.setOutputFormatClass(classOf[TableOutputFormat[Text]])
val records = messages
.map(_._2)
.map(SampleKafkaRecord.parseToSampleRecord)
records
.foreachRDD{ rdd => {
rdd.map(SampleKafkaRecord.SampleToHbasePut).saveAsNewAPIHadoopDataset(job.getConfiguration) }
}
records.print()
ssc.start()
ssc.awaitTermination()
}
}
感觉是配置问题。任何帮助表示赞赏。
我添加了一个名为 zookeeper.session.timeout.ms 的 属性 通过添加代码:
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"zookeeper.connect" -> xxxxxx:2181",
"zookeeper.connection.timeout.ms" -> "10000",
"zookeeper.session.timeout.ms" -> "10000")
并将火花流的间隔设置为10秒。 通过这样做,我的 spark streaming 应用程序可以保持 运行 很长时间。
但是我检查内存的时候,还是一直在减少,不知道怎么解决。
很有可能clock synchronization problem。尝试启用 NTP 以确保所有集群节点同步到同一时间。