Spark Structure 流式读取每个微批次的数据两次。如何避免
Spark Structure streaming read data twice per every micro-batch. How to avoid
我对 spark 结构流有一个非常奇怪的问题。
Spark structure streaming 为每个微批次创建两个 spark 作业。
结果,两次从Kafka读取数据。
这是一个简单的代码片段。
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object CheckHowSparkReadFromKafka {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder()
.config(new SparkConf()
.setAppName(s"simple read from kafka with repartition")
.setMaster("local[*]")
.set("spark.driver.host", "localhost"))
.getOrCreate()
val testPath = "/tmp/spark-test"
FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new Path(testPath), true)
import session.implicits._
val stream = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-20002-prod:9092")
.option("subscribe", "topic")
.option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", "latest")
.load()
.repartitionByRange( $"offset")
.writeStream
.option("path", testPath + "/data")
.option("checkpointLocation", testPath + "/checkpoint")
.format("parquet")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
stream.processAllAvailable()
发生这种情况是因为如果 .repartitionByRange( $"offset")
,如果我删除此行,一切都会好起来的。
但是使用 spark 创建两个作业,一个具有 1 个阶段,仅从 Kafka 读取,第二个具有 3 个阶段读取 -> 随机播放 -> 写入。
所以第一份工作的结果从未使用过。
这对性能有重大影响。
我的一些 Kafka 主题有 1550 个分区,所以读两遍很重要。
如果我添加缓存,事情会变得更好,但这对我来说不是一种方式。
在本地模式下,批处理中的第一个作业耗时不到 0.1 毫秒,索引为 0 的批处理除外。但在 YARN 集群和 Messos 中,两个作业都完全符合预期,并且在我的主题上耗时将近 1.2 分钟。
为什么会这样?我怎样才能避免这种情况?看起来像虫子?
P.S。我使用 spark 2.4.3.
本例spark没有bug。
两次从 Kafka 读取此数据的根本原因非常简单。
repartitionByRange
函数生成两个 spark 作业。
一个用于实际重新分区。
一个用于采样以找到分区的边界。
中查找更多详细信息
我对 spark 结构流有一个非常奇怪的问题。 Spark structure streaming 为每个微批次创建两个 spark 作业。 结果,两次从Kafka读取数据。 这是一个简单的代码片段。
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object CheckHowSparkReadFromKafka {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder()
.config(new SparkConf()
.setAppName(s"simple read from kafka with repartition")
.setMaster("local[*]")
.set("spark.driver.host", "localhost"))
.getOrCreate()
val testPath = "/tmp/spark-test"
FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new Path(testPath), true)
import session.implicits._
val stream = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-20002-prod:9092")
.option("subscribe", "topic")
.option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", "latest")
.load()
.repartitionByRange( $"offset")
.writeStream
.option("path", testPath + "/data")
.option("checkpointLocation", testPath + "/checkpoint")
.format("parquet")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
stream.processAllAvailable()
发生这种情况是因为如果 .repartitionByRange( $"offset")
,如果我删除此行,一切都会好起来的。
但是使用 spark 创建两个作业,一个具有 1 个阶段,仅从 Kafka 读取,第二个具有 3 个阶段读取 -> 随机播放 -> 写入。
所以第一份工作的结果从未使用过。
这对性能有重大影响。 我的一些 Kafka 主题有 1550 个分区,所以读两遍很重要。 如果我添加缓存,事情会变得更好,但这对我来说不是一种方式。 在本地模式下,批处理中的第一个作业耗时不到 0.1 毫秒,索引为 0 的批处理除外。但在 YARN 集群和 Messos 中,两个作业都完全符合预期,并且在我的主题上耗时将近 1.2 分钟。
为什么会这样?我怎样才能避免这种情况?看起来像虫子?
P.S。我使用 spark 2.4.3.
本例spark没有bug。
两次从 Kafka 读取此数据的根本原因非常简单。
repartitionByRange
函数生成两个 spark 作业。
一个用于实际重新分区。
一个用于采样以找到分区的边界。
中查找更多详细信息