当状态数据增长时,Spark Structured Streaming 如何处理内存中的状态?
How does Spark Structured Streaming handle in-memory state when state data is growing?
在 Spark Structured Streaming(版本 2.2.0)中,如果使用 mapGroupsWithState
查询 Update mode 作为输出模式,似乎 Spark 正在存储使用 java.util.ConcurrentHashMap
数据结构的内存状态数据。有人可以向我详细解释当状态数据增长并且内存不足时会发生什么吗?此外,是否可以使用 spark 配置参数更改在内存中存储状态数据的限制?
Can someone explain to me in detail that what happens when the state
data grows and there isn't enough memory anymore
执行器将因 OOM 异常而崩溃。由于使用 mapGroupWithState
,您是负责添加和删除状态的人,如果您用无法为其分配内存的数据使 JVM 不堪重负,进程将会崩溃。
is it possible to change the limit for storing the state data in the
memory, using a spark config parameter?
无法限制存储在内存中的字节数。同样,如果这是 mapGroupsWithState
,您必须以不会导致 JVM 出现 OOM 的方式管理状态,例如设置超时和删除状态。如果我们谈论的是 Spark 为您管理状态的有状态聚合,例如 agg
组合器,那么您可以使用 watermark 来限制状态,这将在时间范围内从内存中逐出旧数据通过。
现有的 State Store 实现使用内存中的 HashMaps(用于存储)+ HDFS(用于容错)
HashMap 是版本化的(每个微批次一个)。在工作程序的执行程序内存中,每个聚合分区的每个版本都有一个单独的键值映射。 (要维护的版本数是可配置的)
回答你的问题:
Can someone explain to me in detail that what happens when the state data grows and there isn't enough memory anymore.
状态存储 HashMaps 与随机任务共享执行程序内存。因此,随着状态增长或洗牌任务需要更多内存,频繁的 GC 和 OOM 将发生导致执行程序失败。
is it possible to change the limit for storing the state data in the memory, using a spark config parameter?
目前这是不可能的。您只能指定将由状态存储和执行程序任务共享的执行程序内存,我们无法在它们之间划分内存。这实际上使当前的实现在数据突然爆发的情况下变得不可靠,在这种情况下即使是水印也无济于事。
如果有兴趣了解状态存储在结构化流中的内部工作方式,这篇文章可能会有用:https://www.linkedin.com/pulse/state-management-spark-structured-streaming-chandan-prakash/
p.s。我是作者
在 Spark Structured Streaming(版本 2.2.0)中,如果使用 mapGroupsWithState
查询 Update mode 作为输出模式,似乎 Spark 正在存储使用 java.util.ConcurrentHashMap
数据结构的内存状态数据。有人可以向我详细解释当状态数据增长并且内存不足时会发生什么吗?此外,是否可以使用 spark 配置参数更改在内存中存储状态数据的限制?
Can someone explain to me in detail that what happens when the state data grows and there isn't enough memory anymore
执行器将因 OOM 异常而崩溃。由于使用 mapGroupWithState
,您是负责添加和删除状态的人,如果您用无法为其分配内存的数据使 JVM 不堪重负,进程将会崩溃。
is it possible to change the limit for storing the state data in the memory, using a spark config parameter?
无法限制存储在内存中的字节数。同样,如果这是 mapGroupsWithState
,您必须以不会导致 JVM 出现 OOM 的方式管理状态,例如设置超时和删除状态。如果我们谈论的是 Spark 为您管理状态的有状态聚合,例如 agg
组合器,那么您可以使用 watermark 来限制状态,这将在时间范围内从内存中逐出旧数据通过。
现有的 State Store 实现使用内存中的 HashMaps(用于存储)+ HDFS(用于容错) HashMap 是版本化的(每个微批次一个)。在工作程序的执行程序内存中,每个聚合分区的每个版本都有一个单独的键值映射。 (要维护的版本数是可配置的) 回答你的问题:
Can someone explain to me in detail that what happens when the state data grows and there isn't enough memory anymore.
状态存储 HashMaps 与随机任务共享执行程序内存。因此,随着状态增长或洗牌任务需要更多内存,频繁的 GC 和 OOM 将发生导致执行程序失败。
is it possible to change the limit for storing the state data in the memory, using a spark config parameter?
目前这是不可能的。您只能指定将由状态存储和执行程序任务共享的执行程序内存,我们无法在它们之间划分内存。这实际上使当前的实现在数据突然爆发的情况下变得不可靠,在这种情况下即使是水印也无济于事。
如果有兴趣了解状态存储在结构化流中的内部工作方式,这篇文章可能会有用:https://www.linkedin.com/pulse/state-management-spark-structured-streaming-chandan-prakash/
p.s。我是作者