为什么在没有在流源中获得任何新偏移量的情况下触发新批次?
Why a new batch is triggered without getting any new offsets in streaming source?
我有多个 spark 结构化流作业,我看到的通常行为是只有当 Kafka 中有任何新的偏移量时才会触发新的批处理,用作创建流查询的源。
但是当我 运行 this 示例使用 mapGroupsWithState
演示任意有状态操作时,我看到即使流源中没有新数据也会触发一个新批处理.为什么会这样,是否可以避免?
Update-1
我修改了上面的示例代码并删除了 updating/removing 之类的状态相关操作。函数只是输出零。但是仍然每 10 秒触发一次批处理,netcat 服务器上没有任何新数据。
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming._
object Stateful {
def main(args: Array[String]): Unit = {
val host = "localhost"
val port = "9999"
val spark = SparkSession
.builder
.appName("StructuredSessionization")
.master("local[2]")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load()
// Split the lines into words, treat words as sessionId of events
val events = lines
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, Int](GroupStateTimeout.ProcessingTimeTimeout) {
case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
0
}
val query = sessionUpdates
.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
query.awaitTermination()
}
}
case class Event(sessionId: String, timestamp: Timestamp)
case class SessionInfo(
numEvents: Int,
startTimestampMs: Long,
endTimestampMs: Long)
出现空批次的原因是在 mapGroupsWithState 调用中使用了超时。
根据《Learning Spark 2.0》这本书说:
"The next micro-batch will call the function on this timed-out key even if there is not data for that key in that micro.batch. [...] Since the timeouts are processed during the micro-batches, the timing of their execution is imprecise and depends heavily on the trigger interval [...]."
由于您已将超时设置为 GroupStateTimeout.ProcessingTimeTimeout
,因此它与您的查询触发时间一致,即 10 秒。另一种方法是根据事件时间设置超时(即 GroupStateTimeout.EventTimeTimeout
)。
GroupState 上的 ScalaDocs 提供了更多详细信息:
When the timeout occurs for a group, the function is called for that group with no values, and GroupState.hasTimedOut() set to true.
我有多个 spark 结构化流作业,我看到的通常行为是只有当 Kafka 中有任何新的偏移量时才会触发新的批处理,用作创建流查询的源。
但是当我 运行 this 示例使用 mapGroupsWithState
演示任意有状态操作时,我看到即使流源中没有新数据也会触发一个新批处理.为什么会这样,是否可以避免?
Update-1 我修改了上面的示例代码并删除了 updating/removing 之类的状态相关操作。函数只是输出零。但是仍然每 10 秒触发一次批处理,netcat 服务器上没有任何新数据。
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming._
object Stateful {
def main(args: Array[String]): Unit = {
val host = "localhost"
val port = "9999"
val spark = SparkSession
.builder
.appName("StructuredSessionization")
.master("local[2]")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load()
// Split the lines into words, treat words as sessionId of events
val events = lines
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, Int](GroupStateTimeout.ProcessingTimeTimeout) {
case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
0
}
val query = sessionUpdates
.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
query.awaitTermination()
}
}
case class Event(sessionId: String, timestamp: Timestamp)
case class SessionInfo(
numEvents: Int,
startTimestampMs: Long,
endTimestampMs: Long)
出现空批次的原因是在 mapGroupsWithState 调用中使用了超时。
根据《Learning Spark 2.0》这本书说:
"The next micro-batch will call the function on this timed-out key even if there is not data for that key in that micro.batch. [...] Since the timeouts are processed during the micro-batches, the timing of their execution is imprecise and depends heavily on the trigger interval [...]."
由于您已将超时设置为 GroupStateTimeout.ProcessingTimeTimeout
,因此它与您的查询触发时间一致,即 10 秒。另一种方法是根据事件时间设置超时(即 GroupStateTimeout.EventTimeTimeout
)。
GroupState 上的 ScalaDocs 提供了更多详细信息:
When the timeout occurs for a group, the function is called for that group with no values, and GroupState.hasTimedOut() set to true.