由于 Jedis Pool,Spark 程序卡住了

Spark program stucks because of Jedis Pool

根据我对 Jedis 和线程的 。我更改了我的代码以使用 JedisPool 而不是 Jedis。但是仍然程序随着线程的增加而卡住。我尝试增加 .setMaxIdle(8000).setMaxTotal(8000) 并临时修复它但后来在其他运行中它在一些迭代后再次卡住。我猜是由于池中缺少连接(我关闭了它们),但线程似乎没有释放连接。

这是我连接的更新版本:

import redis.clients.jedis.{JedisPool, JedisPoolConfig}
 
object redisOp{
  @transient lazy val log: Logger = org.apache.log4j.LogManager.getLogger("myLogger")
  def apply(set: RDD[Int]): Unit = {
    val spark = SparkConstructor()
    val sc = spark.sparkContext
    // initialize Parents and Ranks key-values
    val parents = set.map(i => ("p"+i, i.toString))
    val ranks = set.map(i => ("r"+i, 1.toString))
    sc.toRedisKV(parents) // using spark-redis packege here only, ignore it.
    sc.toRedisKV(ranks)
    log.warn("***Initialized Redis***")
 
  }
 
  val jedisConfig = new JedisPoolConfig()          // Check from here (object's values and variables)
  jedisConfig.setMaxIdle(8000)                    //TODO: a better configuration?
  jedisConfig.setMaxTotal(8000)
  lazy val pool = new JedisPool(jedisConfig, "localhost")

  def find(u: Long): Option[Long] = { // returns leader of the set containing u
    val r = pool.getResource
    val res = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) {
      Some(u)
    } else find(p.toLong))
    r.close()                                    // closing back to pool
    res
  }
// other methods are similar to find()...
}

问题出在你的递归实现上。您在不释放资源的情况下调用下一个递归堆栈。所以在某些时候,最新的堆栈处于资源稀缺状态,因为较旧的堆栈正在持有它们的资源。

因此,在调用下一个递归堆栈之前释放资源。

例如

  def find(u: Long): Option[Long] = { // returns leader of the set containing u
    val r = pool.getResource
    val rget = r.get(s"p$u")
    r.close()                                    // closing back to pool
    val res = Option(rget).flatMap(p => if (p.toLong == u) {
      Some(u)
    } else find(p.toLong))
    res
  }