由于 Jedis Pool,Spark 程序卡住了
Spark program stucks because of Jedis Pool
根据我对 Jedis 和线程的 。我更改了我的代码以使用 JedisPool
而不是 Jedis
。但是仍然程序随着线程的增加而卡住。我尝试增加 .setMaxIdle(8000)
和 .setMaxTotal(8000)
并临时修复它但后来在其他运行中它在一些迭代后再次卡住。我猜是由于池中缺少连接(我关闭了它们),但线程似乎没有释放连接。
这是我连接的更新版本:
import redis.clients.jedis.{JedisPool, JedisPoolConfig}
object redisOp{
@transient lazy val log: Logger = org.apache.log4j.LogManager.getLogger("myLogger")
def apply(set: RDD[Int]): Unit = {
val spark = SparkConstructor()
val sc = spark.sparkContext
// initialize Parents and Ranks key-values
val parents = set.map(i => ("p"+i, i.toString))
val ranks = set.map(i => ("r"+i, 1.toString))
sc.toRedisKV(parents) // using spark-redis packege here only, ignore it.
sc.toRedisKV(ranks)
log.warn("***Initialized Redis***")
}
val jedisConfig = new JedisPoolConfig() // Check from here (object's values and variables)
jedisConfig.setMaxIdle(8000) //TODO: a better configuration?
jedisConfig.setMaxTotal(8000)
lazy val pool = new JedisPool(jedisConfig, "localhost")
def find(u: Long): Option[Long] = { // returns leader of the set containing u
val r = pool.getResource
val res = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) {
Some(u)
} else find(p.toLong))
r.close() // closing back to pool
res
}
// other methods are similar to find()...
}
问题出在你的递归实现上。您在不释放资源的情况下调用下一个递归堆栈。所以在某些时候,最新的堆栈处于资源稀缺状态,因为较旧的堆栈正在持有它们的资源。
因此,在调用下一个递归堆栈之前释放资源。
例如
def find(u: Long): Option[Long] = { // returns leader of the set containing u
val r = pool.getResource
val rget = r.get(s"p$u")
r.close() // closing back to pool
val res = Option(rget).flatMap(p => if (p.toLong == u) {
Some(u)
} else find(p.toLong))
res
}
根据我对 Jedis 和线程的 JedisPool
而不是 Jedis
。但是仍然程序随着线程的增加而卡住。我尝试增加 .setMaxIdle(8000)
和 .setMaxTotal(8000)
并临时修复它但后来在其他运行中它在一些迭代后再次卡住。我猜是由于池中缺少连接(我关闭了它们),但线程似乎没有释放连接。
这是我连接的更新版本:
import redis.clients.jedis.{JedisPool, JedisPoolConfig}
object redisOp{
@transient lazy val log: Logger = org.apache.log4j.LogManager.getLogger("myLogger")
def apply(set: RDD[Int]): Unit = {
val spark = SparkConstructor()
val sc = spark.sparkContext
// initialize Parents and Ranks key-values
val parents = set.map(i => ("p"+i, i.toString))
val ranks = set.map(i => ("r"+i, 1.toString))
sc.toRedisKV(parents) // using spark-redis packege here only, ignore it.
sc.toRedisKV(ranks)
log.warn("***Initialized Redis***")
}
val jedisConfig = new JedisPoolConfig() // Check from here (object's values and variables)
jedisConfig.setMaxIdle(8000) //TODO: a better configuration?
jedisConfig.setMaxTotal(8000)
lazy val pool = new JedisPool(jedisConfig, "localhost")
def find(u: Long): Option[Long] = { // returns leader of the set containing u
val r = pool.getResource
val res = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) {
Some(u)
} else find(p.toLong))
r.close() // closing back to pool
res
}
// other methods are similar to find()...
}
问题出在你的递归实现上。您在不释放资源的情况下调用下一个递归堆栈。所以在某些时候,最新的堆栈处于资源稀缺状态,因为较旧的堆栈正在持有它们的资源。
因此,在调用下一个递归堆栈之前释放资源。
例如
def find(u: Long): Option[Long] = { // returns leader of the set containing u
val r = pool.getResource
val rget = r.get(s"p$u")
r.close() // closing back to pool
val res = Option(rget).flatMap(p => if (p.toLong == u) {
Some(u)
} else find(p.toLong))
res
}