如何在 Scala 中使 FIFO 流(阻塞队列)线程安全(可能死锁)?
How to make a FIFO Stream (Blocking Queue) thread-safe in Scala (possible deadlock)?
基于 Is there a FIFO stream in Scala? 提供的答案,我想知道如何在实践中并发使用 FIFOStream(下面重命名为 QueueStream)。因为我是 Scala 和 Rx 的新手,所以到目前为止我坚持使用 Futures,但我欢迎其他选择(特别是如果以这种方式使用 Futures 太痛苦的话)。
下面工作表中的 FIXME
注释说明了该问题;评论说它没有 运行,但根据过去的经验,我认为它可能会以某种方式陷入僵局。
import java.util.concurrent.{Executors, BlockingQueue, LinkedBlockingQueue}
import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
class QueueStream[A]( private val queue: BlockingQueue[Option[A]] ) {
implicit protected val defaultTimeout: Duration = 100 milliseconds
def toStream(q: BlockingQueue[Option[A]] = queue): Future[Stream[A]] = {
def computeStream(q: BlockingQueue[Option[A]] = queue): Stream[A] =
queue.take() match {
case Some(a) => Stream cons(a, computeStream())
case None => Stream.empty
}
Future {
computeStream()
}
}
def toStreamNoWait(q: BlockingQueue[Option[A]] = queue ):
Future[Stream[A]] = {
def computeStream(q: BlockingQueue[Option[A]] = queue): Stream[A] = {
val timeout = implicitly[Duration]
queue.poll(timeout.toMillis, MILLISECONDS) match {
case Some(a) => Stream cons(a, computeStream())
case None => Stream.empty
}
}
Future {
computeStream()
}
}
def size () = queue.size()
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
object QueueStream {
def apply[A]() = new QueueStream[A](new LinkedBlockingQueue)
}
def printIntsInQueue(queue: QueueStream[Int]): Unit = {
println("About to print some Ints...")
//FIXME: this doesn't run!:
queue.toStream().foreach { ff => ff foreach { ss =>
println(s"LENGTH IS: $ss")
}}
}
val stringList = List("abc", "123", "abc123")
val stringQueue = QueueStream[String]()
stringList.foreach{ii => stringQueue.enqueue(ii)}
val strlenQueue = QueueStream[Int]()
var sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
stringQueue.toStream().foreach{ ff => ff foreach { ss =>
strlenQueue.enqueue(ss.length)
}}
sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
printIntsInQueue(strlenQueue)
编辑:工作代码
import java.util.concurrent.{Executors, BlockingQueue, LinkedBlockingQueue}
import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
class QueueStream[A]( private val queue: BlockingQueue[Option[A]] ) {
implicit protected val defaultTimeout: Duration = 100 milliseconds
def toStream(q: BlockingQueue[Option[A]] = queue ): Stream[A] =
queue.take() match {
case Some(a) => Stream cons(a, toStream())
case None => Stream.empty
}
def toStreamNoWait(q: BlockingQueue[Option[A]] = queue ): Stream[A] = {
val timeout = implicitly[Duration]
queue.poll(timeout.toMillis, MILLISECONDS) match {
case Some(a) => Stream cons(a, toStreamNoWait())
case null => Stream.empty
case None => Stream.empty
}
}
def size () = queue.size()
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
object QueueStream {
def apply[A]() = new QueueStream[A](new LinkedBlockingQueue)
}
def printIntsInQueue(queue: QueueStream[Int]): Unit = {
println("About to print some Ints...")
//FIXME: this doesn't run!:
queue.toStream().foreach {ss =>
println(s"LENGTH IS: $ss")
}
}
val stringList = List("abc", "123", "abc123")
val stringQueue = QueueStream[String]()
stringList.foreach{ii => stringQueue.enqueue(ii)}
stringQueue.close()
val strlenQueue = QueueStream[Int]()
var sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
stringQueue.toStream().foreach { ss =>
strlenQueue.enqueue(ss.length)
}
strlenQueue.close()
sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
printIntsInQueue(strlenQueue)
从阻塞队列中获取下一个元素是阻塞操作;它暂停当前线程,直到下一个元素可用。
QueueStream 由阻塞队列支持,因此当您要求它处理队列中的 "all of the elements" 时(通过 foreach),它会阻塞等待更多元素,直到到达流的末尾.对于这个特定的 class,流的末尾由 None 元素表示,当您调用 close() 时,该元素被插入到队列中。
所以是 "hanging" 因为您还没有使用 close() 关闭流。
通常不需要像您所做的那样在将来包装流,因为流是按需延迟计算的。如果需要,调用者仍然可以在后台线程或未来遍历流。
基于 Is there a FIFO stream in Scala? 提供的答案,我想知道如何在实践中并发使用 FIFOStream(下面重命名为 QueueStream)。因为我是 Scala 和 Rx 的新手,所以到目前为止我坚持使用 Futures,但我欢迎其他选择(特别是如果以这种方式使用 Futures 太痛苦的话)。
下面工作表中的 FIXME
注释说明了该问题;评论说它没有 运行,但根据过去的经验,我认为它可能会以某种方式陷入僵局。
import java.util.concurrent.{Executors, BlockingQueue, LinkedBlockingQueue}
import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
class QueueStream[A]( private val queue: BlockingQueue[Option[A]] ) {
implicit protected val defaultTimeout: Duration = 100 milliseconds
def toStream(q: BlockingQueue[Option[A]] = queue): Future[Stream[A]] = {
def computeStream(q: BlockingQueue[Option[A]] = queue): Stream[A] =
queue.take() match {
case Some(a) => Stream cons(a, computeStream())
case None => Stream.empty
}
Future {
computeStream()
}
}
def toStreamNoWait(q: BlockingQueue[Option[A]] = queue ):
Future[Stream[A]] = {
def computeStream(q: BlockingQueue[Option[A]] = queue): Stream[A] = {
val timeout = implicitly[Duration]
queue.poll(timeout.toMillis, MILLISECONDS) match {
case Some(a) => Stream cons(a, computeStream())
case None => Stream.empty
}
}
Future {
computeStream()
}
}
def size () = queue.size()
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
object QueueStream {
def apply[A]() = new QueueStream[A](new LinkedBlockingQueue)
}
def printIntsInQueue(queue: QueueStream[Int]): Unit = {
println("About to print some Ints...")
//FIXME: this doesn't run!:
queue.toStream().foreach { ff => ff foreach { ss =>
println(s"LENGTH IS: $ss")
}}
}
val stringList = List("abc", "123", "abc123")
val stringQueue = QueueStream[String]()
stringList.foreach{ii => stringQueue.enqueue(ii)}
val strlenQueue = QueueStream[Int]()
var sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
stringQueue.toStream().foreach{ ff => ff foreach { ss =>
strlenQueue.enqueue(ss.length)
}}
sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
printIntsInQueue(strlenQueue)
编辑:工作代码
import java.util.concurrent.{Executors, BlockingQueue, LinkedBlockingQueue}
import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
class QueueStream[A]( private val queue: BlockingQueue[Option[A]] ) {
implicit protected val defaultTimeout: Duration = 100 milliseconds
def toStream(q: BlockingQueue[Option[A]] = queue ): Stream[A] =
queue.take() match {
case Some(a) => Stream cons(a, toStream())
case None => Stream.empty
}
def toStreamNoWait(q: BlockingQueue[Option[A]] = queue ): Stream[A] = {
val timeout = implicitly[Duration]
queue.poll(timeout.toMillis, MILLISECONDS) match {
case Some(a) => Stream cons(a, toStreamNoWait())
case null => Stream.empty
case None => Stream.empty
}
}
def size () = queue.size()
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
object QueueStream {
def apply[A]() = new QueueStream[A](new LinkedBlockingQueue)
}
def printIntsInQueue(queue: QueueStream[Int]): Unit = {
println("About to print some Ints...")
//FIXME: this doesn't run!:
queue.toStream().foreach {ss =>
println(s"LENGTH IS: $ss")
}
}
val stringList = List("abc", "123", "abc123")
val stringQueue = QueueStream[String]()
stringList.foreach{ii => stringQueue.enqueue(ii)}
stringQueue.close()
val strlenQueue = QueueStream[Int]()
var sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
stringQueue.toStream().foreach { ss =>
strlenQueue.enqueue(ss.length)
}
strlenQueue.close()
sz = strlenQueue.size()
println(s"strlenQueue size is $sz")
printIntsInQueue(strlenQueue)
从阻塞队列中获取下一个元素是阻塞操作;它暂停当前线程,直到下一个元素可用。
QueueStream 由阻塞队列支持,因此当您要求它处理队列中的 "all of the elements" 时(通过 foreach),它会阻塞等待更多元素,直到到达流的末尾.对于这个特定的 class,流的末尾由 None 元素表示,当您调用 close() 时,该元素被插入到队列中。
所以是 "hanging" 因为您还没有使用 close() 关闭流。
通常不需要像您所做的那样在将来包装流,因为流是按需延迟计算的。如果需要,调用者仍然可以在后台线程或未来遍历流。