Spark streaming mapWithState 超时延迟?
Spark streaming mapWithState timeout delayed?
我预计 Spark 1.6+ 的新 mapWithState API 几乎可以立即删除超时的对象,但有延迟。
我正在使用下面 JavaStatefulNetworkWordCount 的改编版本测试 API:
SparkConf sparkConf = new SparkConf()
.setAppName("JavaStatefulNetworkWordCount")
.setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint("./tmp");
StateSpec<String, Integer, Integer, Tuple2<String, Integer>> mappingFunc =
StateSpec.function((word, one, state) -> {
if (state.isTimingOut())
{
System.out.println("Timing out the word: " + word);
return new Tuple2<String,Integer>(word, state.get());
}
else
{
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
state.update(sum);
return output;
}
});
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER_2)
.flatMap(x -> Arrays.asList(SPACE.split(x)))
.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
.mapWithState(mappingFunc.timeout(Durations.seconds(5)));
stateDstream.stateSnapshots().print();
连同 nc (nc -l -p <port>
)
当我在 nc window 中输入一个词时,我看到控制台每秒都在打印元组。但根据超时设置,超时消息似乎并没有像预期的那样在 5 秒后打印出来。元组过期所需的时间似乎在 5 到 20 秒之间变化。
我是否遗漏了某些配置选项,或者超时可能仅与检查点同时执行?
Am I missing some configuration option, or is the timeout perhaps only
performed at the same time as snapshots?
每次调用 mapWithState
时(根据您的配置,大约每 1 秒调用一次),MapWithStateRDD
将在内部检查过期记录并使它们超时。你可以在代码中看到它:
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}
(除了执行每个作业所花费的时间,事实证明 newStateMap.remove(key)
实际上 仅标记 文件以供删除。有关更多信息,请参阅 "Edit"。 )
您必须考虑安排每个阶段所花费的时间,以及每次执行此类阶段实际轮到它所花费的时间 运行。它不准确,因为这 运行 作为一个分布式系统,其他因素可能会发挥作用,使您的超时 more/less 比您预期的更准确。
编辑
正如@etov 正确指出的那样,newStateMap.remove(key)
实际上并没有从 OpenHashMapBasedStateMap[K, S]
中删除该元素,而只是将其标记为删除。这也是您看到过期时间加起来的原因。
实际的相关代码在这里:
// Write the data in the parent state map while
// copying the data into a new parent map for compaction (if needed)
val doCompaction = shouldCompact
val newParentSessionStore = if (doCompaction) {
val initCapacity = if (approxSize > 0) approxSize else 64
new OpenHashMapBasedStateMap[K, S](initialCapacity = initCapacity, deltaChainThreshold)
} else { null }
val iterOfActiveSessions = parentStateMap.getAll()
var parentSessionCount = 0
// First write the approximate size of the data to be written, so that readObject can
// allocate appropriately sized OpenHashMap.
outputStream.writeInt(approxSize)
while(iterOfActiveSessions.hasNext) {
parentSessionCount += 1
val (key, state, updateTime) = iterOfActiveSessions.next()
outputStream.writeObject(key)
outputStream.writeObject(state)
outputStream.writeLong(updateTime)
if (doCompaction) {
newParentSessionStore.deltaMap.update(
key, StateInfo(state, updateTime, deleted = false))
}
}
// Write the final limit marking object with the correct count of records written.
val limiterObj = new LimitMarker(parentSessionCount)
outputStream.writeObject(limiterObj)
if (doCompaction) {
parentStateMap = newParentSessionStore
}
如果 deltaMap
应该被压缩(用 doCompaction
变量标记),然后(并且只有在那时)从所有已删除的实例中清除地图。这种情况多久发生一次?一个增量超过了 threadshold:
val DELTA_CHAIN_LENGTH_THRESHOLD = 20
表示delta链超过20条,有标记为删除的条目。
一旦事件超时,它不会立即删除,但仅通过将其保存到 'deltaMap':
来标记为删除
override def remove(key: K): Unit = {
val stateInfo = deltaMap(key)
if (stateInfo != null) {
stateInfo.markDeleted()
} else {
val newInfo = new StateInfo[S](deleted = true)
deltaMap.update(key, newInfo)
}
}
然后,仅在检查点 收集超时事件并发送到输出流。也就是说:在批次 t 超时的事件将仅在下一个检查点出现在输出流中——默认情况下,平均在 5 个批次间隔之后,即批次 t+5:
override def checkpoint(): Unit = {
super.checkpoint()
doFullScan = true
}
...
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
...
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
...
元素只有在元素足够多且状态映射被序列化时才会被移除——目前也只发生在检查点:
/** Whether the delta chain length is long enough that it should be compacted */
def shouldCompact: Boolean = {
deltaChainLength >= deltaChainThreshold
}
// Write the data in the parent state map while copying the data into a new parent map for
// compaction (if needed)
val doCompaction = shouldCompact
...
默认情况下,检查点每 10 次迭代发生一次,因此在上面的示例中每 10 秒发生一次;由于您的超时时间为 5 秒,因此事件预计会在 5-15 秒内发生。
编辑:根据@YuvalItzchakov
的评论更正并详细说明了答案
我预计 Spark 1.6+ 的新 mapWithState API 几乎可以立即删除超时的对象,但有延迟。
我正在使用下面 JavaStatefulNetworkWordCount 的改编版本测试 API:
SparkConf sparkConf = new SparkConf()
.setAppName("JavaStatefulNetworkWordCount")
.setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint("./tmp");
StateSpec<String, Integer, Integer, Tuple2<String, Integer>> mappingFunc =
StateSpec.function((word, one, state) -> {
if (state.isTimingOut())
{
System.out.println("Timing out the word: " + word);
return new Tuple2<String,Integer>(word, state.get());
}
else
{
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
state.update(sum);
return output;
}
});
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER_2)
.flatMap(x -> Arrays.asList(SPACE.split(x)))
.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
.mapWithState(mappingFunc.timeout(Durations.seconds(5)));
stateDstream.stateSnapshots().print();
连同 nc (nc -l -p <port>
)
当我在 nc window 中输入一个词时,我看到控制台每秒都在打印元组。但根据超时设置,超时消息似乎并没有像预期的那样在 5 秒后打印出来。元组过期所需的时间似乎在 5 到 20 秒之间变化。
我是否遗漏了某些配置选项,或者超时可能仅与检查点同时执行?
Am I missing some configuration option, or is the timeout perhaps only performed at the same time as snapshots?
每次调用 mapWithState
时(根据您的配置,大约每 1 秒调用一次),MapWithStateRDD
将在内部检查过期记录并使它们超时。你可以在代码中看到它:
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}
(除了执行每个作业所花费的时间,事实证明 newStateMap.remove(key)
实际上 仅标记 文件以供删除。有关更多信息,请参阅 "Edit"。 )
您必须考虑安排每个阶段所花费的时间,以及每次执行此类阶段实际轮到它所花费的时间 运行。它不准确,因为这 运行 作为一个分布式系统,其他因素可能会发挥作用,使您的超时 more/less 比您预期的更准确。
编辑
正如@etov 正确指出的那样,newStateMap.remove(key)
实际上并没有从 OpenHashMapBasedStateMap[K, S]
中删除该元素,而只是将其标记为删除。这也是您看到过期时间加起来的原因。
实际的相关代码在这里:
// Write the data in the parent state map while
// copying the data into a new parent map for compaction (if needed)
val doCompaction = shouldCompact
val newParentSessionStore = if (doCompaction) {
val initCapacity = if (approxSize > 0) approxSize else 64
new OpenHashMapBasedStateMap[K, S](initialCapacity = initCapacity, deltaChainThreshold)
} else { null }
val iterOfActiveSessions = parentStateMap.getAll()
var parentSessionCount = 0
// First write the approximate size of the data to be written, so that readObject can
// allocate appropriately sized OpenHashMap.
outputStream.writeInt(approxSize)
while(iterOfActiveSessions.hasNext) {
parentSessionCount += 1
val (key, state, updateTime) = iterOfActiveSessions.next()
outputStream.writeObject(key)
outputStream.writeObject(state)
outputStream.writeLong(updateTime)
if (doCompaction) {
newParentSessionStore.deltaMap.update(
key, StateInfo(state, updateTime, deleted = false))
}
}
// Write the final limit marking object with the correct count of records written.
val limiterObj = new LimitMarker(parentSessionCount)
outputStream.writeObject(limiterObj)
if (doCompaction) {
parentStateMap = newParentSessionStore
}
如果 deltaMap
应该被压缩(用 doCompaction
变量标记),然后(并且只有在那时)从所有已删除的实例中清除地图。这种情况多久发生一次?一个增量超过了 threadshold:
val DELTA_CHAIN_LENGTH_THRESHOLD = 20
表示delta链超过20条,有标记为删除的条目。
一旦事件超时,它不会立即删除,但仅通过将其保存到 'deltaMap':
来标记为删除override def remove(key: K): Unit = {
val stateInfo = deltaMap(key)
if (stateInfo != null) {
stateInfo.markDeleted()
} else {
val newInfo = new StateInfo[S](deleted = true)
deltaMap.update(key, newInfo)
}
}
然后,仅在检查点 收集超时事件并发送到输出流。也就是说:在批次 t 超时的事件将仅在下一个检查点出现在输出流中——默认情况下,平均在 5 个批次间隔之后,即批次 t+5:
override def checkpoint(): Unit = {
super.checkpoint()
doFullScan = true
}
...
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
...
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
...
元素只有在元素足够多且状态映射被序列化时才会被移除——目前也只发生在检查点:
/** Whether the delta chain length is long enough that it should be compacted */
def shouldCompact: Boolean = {
deltaChainLength >= deltaChainThreshold
}
// Write the data in the parent state map while copying the data into a new parent map for
// compaction (if needed)
val doCompaction = shouldCompact
...
默认情况下,检查点每 10 次迭代发生一次,因此在上面的示例中每 10 秒发生一次;由于您的超时时间为 5 秒,因此事件预计会在 5-15 秒内发生。
编辑:根据@YuvalItzchakov
的评论更正并详细说明了答案