Spark Streaming 1.6 + Kafka:处于 "queued" 状态的批次过多
Spark Streaming 1.6 + Kafka: Too many batches in "queued" status
我正在使用 spark streaming 来消费来自 Kafka 主题的消息,该主题有 10 个分区。我正在使用直接方法从 kafka 消费,代码可以在下面找到:
def createStreamingContext(conf: Conf): StreamingContext = {
val dateFormat = conf.dateFormat.apply
val hiveTable = conf.tableName.apply
val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.driver.allowMultipleContexts", "true")
val sc = SparkContextBuilder.build(Some(sparkConf))
val ssc = new StreamingContext(sc, Seconds(conf.batchInterval.apply))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> conf.kafkaBrokers.apply,
"key.deserializer" -> classOf[StringDeserializer].getName,
"value.deserializer" -> classOf[StringDeserializer].getName,
"auto.offset.reset" -> "smallest",
"enable.auto.commit" -> "false"
)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
conf.topics.apply().split(",").toSet[String]
)
val windowedKafkaStream = directKafkaStream.window(Seconds(conf.windowDuration.apply))
ssc.checkpoint(conf.sparkCheckpointDir.apply)
val eirRDD: DStream[Row] = windowedKafkaStream.map { kv =>
val fields: Array[String] = kv._2.split(",")
createDomainObject(fields, dateFormat)
}
eirRDD.foreachRDD { rdd =>
val schema = SchemaBuilder.build()
val sqlContext: HiveContext = HiveSQLContext.getInstance(Some(rdd.context))
val eirDF: DataFrame = sqlContext.createDataFrame(rdd, schema)
eirDF
.select(schema.map(c => col(c.name)): _*)
.write
.mode(SaveMode.Append)
.partitionBy("year", "month", "day")
.insertInto(hiveTable)
}
ssc
}
从代码可以看出,我是用window实现的(不对请指正):由于有一个action插入配置单元 table,我想避免过于频繁地写入 HDFS,所以我想要的是在内存中保存足够的数据,然后才写入文件系统。我认为使用 window 是实现它的正确方法。
现在,在下图中,您可以看到有很多批次正在排队,而正在处理的批次需要很长时间才能完成。
我还提供了正在处理的单个批次的详细信息:
为什么插入操作有这么多任务,而批处理中的事件并不多?有时,0 个事件也会生成数千个需要永远才能完成的任务。
我用Spark处理微批的方式有问题吗?
感谢您的帮助!
一些额外的细节:
Yarn 容器最大为 2gb。
在这个 Yarn 队列中,容器的最大数量是 10。
当我查看正在执行此 spark 应用程序的队列的详细信息时,容器的数量非常大,大约有 15k 个待处理的容器。
好吧,我终于想通了。显然 Spark Streaming 不能处理空事件,所以在代码的 foreachRDD 部分中,我添加了以下内容:
eirRDD.foreachRDD { rdd =>
if (rdd.take(1).length != 0) {
//do action
}
}
这样我们就可以跳过空的微批次。 isEmpty() 方法不起作用。
希望这对其他人有帮助! ;)
我正在使用 spark streaming 来消费来自 Kafka 主题的消息,该主题有 10 个分区。我正在使用直接方法从 kafka 消费,代码可以在下面找到:
def createStreamingContext(conf: Conf): StreamingContext = {
val dateFormat = conf.dateFormat.apply
val hiveTable = conf.tableName.apply
val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.driver.allowMultipleContexts", "true")
val sc = SparkContextBuilder.build(Some(sparkConf))
val ssc = new StreamingContext(sc, Seconds(conf.batchInterval.apply))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> conf.kafkaBrokers.apply,
"key.deserializer" -> classOf[StringDeserializer].getName,
"value.deserializer" -> classOf[StringDeserializer].getName,
"auto.offset.reset" -> "smallest",
"enable.auto.commit" -> "false"
)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
conf.topics.apply().split(",").toSet[String]
)
val windowedKafkaStream = directKafkaStream.window(Seconds(conf.windowDuration.apply))
ssc.checkpoint(conf.sparkCheckpointDir.apply)
val eirRDD: DStream[Row] = windowedKafkaStream.map { kv =>
val fields: Array[String] = kv._2.split(",")
createDomainObject(fields, dateFormat)
}
eirRDD.foreachRDD { rdd =>
val schema = SchemaBuilder.build()
val sqlContext: HiveContext = HiveSQLContext.getInstance(Some(rdd.context))
val eirDF: DataFrame = sqlContext.createDataFrame(rdd, schema)
eirDF
.select(schema.map(c => col(c.name)): _*)
.write
.mode(SaveMode.Append)
.partitionBy("year", "month", "day")
.insertInto(hiveTable)
}
ssc
}
从代码可以看出,我是用window实现的(不对请指正):由于有一个action插入配置单元 table,我想避免过于频繁地写入 HDFS,所以我想要的是在内存中保存足够的数据,然后才写入文件系统。我认为使用 window 是实现它的正确方法。
现在,在下图中,您可以看到有很多批次正在排队,而正在处理的批次需要很长时间才能完成。
我还提供了正在处理的单个批次的详细信息:
为什么插入操作有这么多任务,而批处理中的事件并不多?有时,0 个事件也会生成数千个需要永远才能完成的任务。
我用Spark处理微批的方式有问题吗?
感谢您的帮助!
一些额外的细节:
Yarn 容器最大为 2gb。 在这个 Yarn 队列中,容器的最大数量是 10。 当我查看正在执行此 spark 应用程序的队列的详细信息时,容器的数量非常大,大约有 15k 个待处理的容器。
好吧,我终于想通了。显然 Spark Streaming 不能处理空事件,所以在代码的 foreachRDD 部分中,我添加了以下内容:
eirRDD.foreachRDD { rdd =>
if (rdd.take(1).length != 0) {
//do action
}
}
这样我们就可以跳过空的微批次。 isEmpty() 方法不起作用。
希望这对其他人有帮助! ;)