在 Scala 中使用线程与 Redis (Jedis) 时遇到问题

Trouble using Threading with Redis (Jedis) in Scala

我有一个使用线程(实际上是 Spark)和 Redis (Jedis) 的 Scala 程序。我为我的 Redis 操作定义了一个 object,其中我有一个 Lazy val 用于连接。我需要每个线程打开与 Redis 的连接并并行使用它。
连接对象:

object redisOp{
  lazy val r = new Jedis("127.0.0.1",6379,30)
  def find(u: Long): Option[Long] = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) Some(u) else find(p.toLong))
  // and other functions
}

当我将它与一个线程一起使用时,它运行良好。但是当多个线程使用它时,我会出错。起初,我在每个线程中得到 Unknown replay: 4,其中“4”是一个随机字符 (redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply:)。
然后从 redis-cli 我尝试设置 config set timeout 3000030000 因为我也在日志中看到 redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream 有时 redis.clients.jedis.exceptions.JedisDataException: ERR Protocol error: invalid multibulk length 。现在在某些运行中(当切换到 2 个线程而不是 4 个线程时),程序永远停留在一个阶段,没有错误!我检查了 Spark-UI 来检查执行者的日志,但找不到任何有用的东西:https://pastebin.com/iJMeBD0D

我认为问题在于定义和使用与 Redis 的连接。另外,如果需要,请告诉我关闭连接的正确方法。

Jedis 对象不是线程安全的。您应该在多线程环境中使用某种 object/connection 池。 Jedis 为此提供了 JedisPool。可以在 Jedis Wiki.

中找到更多详细信息

基本思路是通过 JedisPool.getResource() 获取 Jedis 对象,通过 Jedis.close() 获取对象 return。