是否可以直接在 Spark worker 中创建变量?
Is it possible to create a variable directly in Spark workers?
我想做的是在每个 Spark worker 中生成一个上下文,我可以将其用于本地查找。
查找数据位于数据库中,我想将其缓存在每个工作人员上。有没有简单的方法可以做到这一点?
使用的解决方法:
- 创建一个延迟初始化的
Broadcast
变量并将其用于我的函数。函数第一次尝试访问它时,我调用我的 SQL 代码来初始化它。
- 创建一个急切初始化的
Broadcast
并使用 torrent 广播使其在工作人员中可用
PS。我没有使用 JdbcRDD
因为我希望数据被复制而不是分区。有谁知道如果我不使用 JdbcRDD
的分区属性会发生什么?这会使其正常工作还是会产生不确定的行为?
您可以创建一个单例对象,其中包含对您要使用的解析缓存的引用:
object ResolutionCache {
var connection = _
var cache: Map[Key,Value] = Map()
def resolve(key:Key):Value = ???
}
那么这个对象可以用来解析RDD操作中的值:
val resolved = keysRDD.map(key => (key -> ResolutionCache.resolve(key)))
此对象持有的连接和值将由每个工作 JVM 独立维护。
我们必须特别注意连接管理和并发行为。特别是,resolve
必须是线程安全的。
我想做的是在每个 Spark worker 中生成一个上下文,我可以将其用于本地查找。 查找数据位于数据库中,我想将其缓存在每个工作人员上。有没有简单的方法可以做到这一点?
使用的解决方法:
- 创建一个延迟初始化的
Broadcast
变量并将其用于我的函数。函数第一次尝试访问它时,我调用我的 SQL 代码来初始化它。 - 创建一个急切初始化的
Broadcast
并使用 torrent 广播使其在工作人员中可用
PS。我没有使用 JdbcRDD
因为我希望数据被复制而不是分区。有谁知道如果我不使用 JdbcRDD
的分区属性会发生什么?这会使其正常工作还是会产生不确定的行为?
您可以创建一个单例对象,其中包含对您要使用的解析缓存的引用:
object ResolutionCache {
var connection = _
var cache: Map[Key,Value] = Map()
def resolve(key:Key):Value = ???
}
那么这个对象可以用来解析RDD操作中的值:
val resolved = keysRDD.map(key => (key -> ResolutionCache.resolve(key)))
此对象持有的连接和值将由每个工作 JVM 独立维护。
我们必须特别注意连接管理和并发行为。特别是,resolve
必须是线程安全的。