如何在 Scala 中使 FIFO 流(阻塞队列)线程安全(可能死锁)?

How to make a FIFO Stream (Blocking Queue) thread-safe in Scala (possible deadlock)?

基于 Is there a FIFO stream in Scala? 提供的答案,我想知道如何在实践中并发使用 FIFOStream(下面重命名为 QueueStream)。因为我是 Scala 和 Rx 的新手,所以到目前为止我坚持使用 Futures,但我欢迎其他选择(特别是如果以这种方式使用 Futures 太痛苦的话)。

下面工作表中的 FIXME 注释说明了该问题;评论说它没有 运行,但根据过去的经验,我认为它可能会以某种方式陷入僵局。

import java.util.concurrent.{Executors, BlockingQueue, LinkedBlockingQueue}
import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))


class QueueStream[A]( private val queue: BlockingQueue[Option[A]] ) {

  implicit protected val defaultTimeout: Duration = 100 milliseconds

  def toStream(q: BlockingQueue[Option[A]] = queue): Future[Stream[A]] = {
    def computeStream(q: BlockingQueue[Option[A]] = queue): Stream[A] =
      queue.take() match {
        case Some(a) => Stream cons(a, computeStream())
        case None => Stream.empty
      }
    Future {
      computeStream()
    }
  }
  def toStreamNoWait(q: BlockingQueue[Option[A]] = queue ):
  Future[Stream[A]] = {
    def computeStream(q: BlockingQueue[Option[A]] = queue): Stream[A] = {
      val timeout = implicitly[Duration]
      queue.poll(timeout.toMillis, MILLISECONDS) match {
        case Some(a) => Stream cons(a, computeStream())
        case None => Stream.empty
      }
    }
    Future {
      computeStream()
    }
  }

  def size () = queue.size()
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
object QueueStream {
  def apply[A]() = new QueueStream[A](new LinkedBlockingQueue)
}
def printIntsInQueue(queue: QueueStream[Int]): Unit = {
  println("About to print some Ints...")
  //FIXME: this doesn't run!:
  queue.toStream().foreach { ff => ff foreach { ss =>
    println(s"LENGTH IS: $ss")
  }}
}
val stringList = List("abc", "123", "abc123")
val stringQueue = QueueStream[String]()
stringList.foreach{ii => stringQueue.enqueue(ii)}
val strlenQueue = QueueStream[Int]()
var sz = strlenQueue.size()
println(s"strlenQueue size is $sz")

stringQueue.toStream().foreach{ ff => ff foreach { ss =>
  strlenQueue.enqueue(ss.length)
}}
sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
printIntsInQueue(strlenQueue)

编辑:工作代码

import java.util.concurrent.{Executors, BlockingQueue, LinkedBlockingQueue}
import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

class QueueStream[A]( private val queue: BlockingQueue[Option[A]] ) {

  implicit protected val defaultTimeout: Duration = 100 milliseconds

  def toStream(q: BlockingQueue[Option[A]] = queue ): Stream[A] =
    queue.take() match {
      case Some(a) => Stream cons(a, toStream())
      case None => Stream.empty
    }
  def toStreamNoWait(q: BlockingQueue[Option[A]] = queue ): Stream[A] = {
    val timeout = implicitly[Duration]
    queue.poll(timeout.toMillis, MILLISECONDS) match {
      case Some(a) => Stream cons(a, toStreamNoWait())
      case null => Stream.empty
      case None => Stream.empty
    }
  }

  def size () = queue.size()
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
object QueueStream {
  def apply[A]() = new QueueStream[A](new LinkedBlockingQueue)
}

def printIntsInQueue(queue: QueueStream[Int]): Unit = {
  println("About to print some Ints...")
  //FIXME: this doesn't run!:
  queue.toStream().foreach {ss =>
    println(s"LENGTH IS: $ss")
  }
}
val stringList = List("abc", "123", "abc123")
val stringQueue = QueueStream[String]()
stringList.foreach{ii => stringQueue.enqueue(ii)}
stringQueue.close()
val strlenQueue = QueueStream[Int]()
var sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
stringQueue.toStream().foreach { ss =>
  strlenQueue.enqueue(ss.length)
}
strlenQueue.close()
sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
printIntsInQueue(strlenQueue)

从阻塞队列中获取下一个元素是阻塞操作;它暂停当前线程,直到下一个元素可用。

QueueStream 由阻塞队列支持,因此当您要求它处理队列中的 "all of the elements" 时(通过 foreach),它会阻塞等待更多元素,直到到达流的末尾.对于这个特定的 class,流的末尾由 None 元素表示,当您调用 close() 时,该元素被插入到队列中。

所以是 "hanging" 因为您还没有使用 close() 关闭流。

通常不需要像您所做的那样在将来包装流,因为流是按需延迟计算的。如果需要,调用者仍然可以在后台线程或未来遍历流。