Stream-Static Join:如何定期刷新(unpersist/persist)静态Dataframe
Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically
我正在构建一个 Spark Structured Streaming 应用程序,我正在其中进行批处理流连接。批处理数据的来源会定期更新。
所以,我计划定期对那批数据进行 persist/unpersist。
下面是我用来保存和取消保存批处理数据的示例代码。
流量:
- 读取批量数据
- 持久化批量数据
- 每隔一小时,取消持久化数据并读取批量数据并再次持久化。
但是,我没有看到每小时刷新一次批处理数据。
代码:
var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()
if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
refreshedTime = Instant.now()
batchDF.unpersist(false)
batchDF = handler.readBatchDF(sparkSession)
.persist(StorageLevel.MEMORY_AND_DISK)
}
在 spark structured streaming 作业中有没有更好的方法来实现这种情况?
您可以通过使用结构化流提供的流调度功能来做到这一点。
您可以通过创建一个定期刷新静态数据帧的人工“速率”流来触发静态数据帧的刷新(取消持久化 -> 加载 -> 持久化)。想法是:
- 最初加载静态Dataframe并保持
var
- 定义一个刷新静态Dataframe的方法
- 使用按要求的时间间隔(例如 1 小时)触发的“速率”流
- 读取实际流数据并与静态Dataframe进行join操作
- 在该速率流中有一个
foreachBatch
接收器调用在步骤 2 中创建的更新方法。
以下代码在 Spark 3.0.1、Scala 2.12.10 和 Delta 0.7.0 上运行良好。
// 1. Load the staticDataframe initially and keep as `var`
var staticDf = spark.read.format("delta").load(deltaPath)
staticDf.persist()
// 2. Define a method that refreshes the static Dataframe
def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
staticDf.unpersist()
staticDf = spark.read.format("delta").load(deltaPath)
staticDf.persist()
println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
}
// 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
val staticRefreshStream = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 1)
.load()
.selectExpr("CAST(value as LONG) as trigger")
.as[Long]
// 4. Read actual streaming data and perform join operation with static Dataframe
// As an example I used Kafka as a streaming source
val streamingDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING) as id", "offset as streamingField")
val joinDf = streamingDf.join(staticDf, "id")
val query = joinDf.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/path/to/sparkCheckpoint")
.start()
// 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
staticRefreshStream.writeStream
.outputMode("append")
.foreachBatch(foreachBatchMethod[Long] _)
.queryName("RefreshStream")
.trigger(Trigger.ProcessingTime("5 seconds")) // or e.g. 1 hour
.start()
为了获得一个完整的示例,增量 table 已创建并使用新值进行了更新,如下所示:
val deltaPath = "file:///tmp/delta/table"
import spark.implicits._
val df = Seq(
(1L, "static1"),
(2L, "static2")
).toDF("id", "deltaField")
df.write
.mode(SaveMode.Overwrite)
.format("delta")
.save(deltaPath)
我正在构建一个 Spark Structured Streaming 应用程序,我正在其中进行批处理流连接。批处理数据的来源会定期更新。
所以,我计划定期对那批数据进行 persist/unpersist。
下面是我用来保存和取消保存批处理数据的示例代码。
流量:
- 读取批量数据
- 持久化批量数据
- 每隔一小时,取消持久化数据并读取批量数据并再次持久化。
但是,我没有看到每小时刷新一次批处理数据。
代码:
var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()
if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
refreshedTime = Instant.now()
batchDF.unpersist(false)
batchDF = handler.readBatchDF(sparkSession)
.persist(StorageLevel.MEMORY_AND_DISK)
}
在 spark structured streaming 作业中有没有更好的方法来实现这种情况?
您可以通过使用结构化流提供的流调度功能来做到这一点。
您可以通过创建一个定期刷新静态数据帧的人工“速率”流来触发静态数据帧的刷新(取消持久化 -> 加载 -> 持久化)。想法是:
- 最初加载静态Dataframe并保持
var
- 定义一个刷新静态Dataframe的方法
- 使用按要求的时间间隔(例如 1 小时)触发的“速率”流
- 读取实际流数据并与静态Dataframe进行join操作
- 在该速率流中有一个
foreachBatch
接收器调用在步骤 2 中创建的更新方法。
以下代码在 Spark 3.0.1、Scala 2.12.10 和 Delta 0.7.0 上运行良好。
// 1. Load the staticDataframe initially and keep as `var`
var staticDf = spark.read.format("delta").load(deltaPath)
staticDf.persist()
// 2. Define a method that refreshes the static Dataframe
def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
staticDf.unpersist()
staticDf = spark.read.format("delta").load(deltaPath)
staticDf.persist()
println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
}
// 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
val staticRefreshStream = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 1)
.load()
.selectExpr("CAST(value as LONG) as trigger")
.as[Long]
// 4. Read actual streaming data and perform join operation with static Dataframe
// As an example I used Kafka as a streaming source
val streamingDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING) as id", "offset as streamingField")
val joinDf = streamingDf.join(staticDf, "id")
val query = joinDf.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/path/to/sparkCheckpoint")
.start()
// 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
staticRefreshStream.writeStream
.outputMode("append")
.foreachBatch(foreachBatchMethod[Long] _)
.queryName("RefreshStream")
.trigger(Trigger.ProcessingTime("5 seconds")) // or e.g. 1 hour
.start()
为了获得一个完整的示例,增量 table 已创建并使用新值进行了更新,如下所示:
val deltaPath = "file:///tmp/delta/table"
import spark.implicits._
val df = Seq(
(1L, "static1"),
(2L, "static2")
).toDF("id", "deltaField")
df.write
.mode(SaveMode.Overwrite)
.format("delta")
.save(deltaPath)