从 scalaz 中的队列创建进程
Creating a process from a queue in scalaz
我正在尝试按照 https://github.com/functional-streams-for-scala/fs2/wiki/Binding-to-asynchronous-processes
中的第一个示例
填补一些空白并添加一些调试打印,我得到了以下代码:
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scalaz.concurrent.Task
import scalaz.stream.async.mutable.Queue
import scalaz.stream.{Process, Sink}
object ProcessTest {
def main(args: Array[String]): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz.stream.async
val q: Queue[Int] = async.unboundedQueue[Int]
val src: Process[Task, Int] = q.dequeue
// Thread 1
val f1 = Future {
for (i <- 0 to 10) {
println(s"enqueueOne $i")
Thread.sleep(100)
q.enqueueOne(i)
}
println("closing")
q.close
println("closed")
}
// Thread 2
val f2 = Future {
val buf = new collection.mutable.ArrayBuffer[Int]
val snk: Sink[Task, Int] = scalaz.stream.io.fillBuffer(buf)
val run: Task[Unit] = src.map(x => {
println(s"map $x")
x
}).to(snk).run
println("running")
run.get.runFor(3.seconds)
println(s"result = ${buf.toList}")
}
Await.result(f1, 10.seconds)
Await.result(f2, 10.seconds)
}
}
当我尝试 运行 时,线程 2 中没有收到任何内容:
enqueueOne 0
running
enqueueOne 1
enqueueOne 2
enqueueOne 3
enqueueOne 4
enqueueOne 5
enqueueOne 6
enqueueOne 7
enqueueOne 8
enqueueOne 9
enqueueOne 10
closing
closed
[error] (run-main-9) java.util.concurrent.TimeoutException
我做错了什么?
这是哪里阻塞了?
(我正在使用 scalaz-stream 0.8.6)
好的,我发现了问题:enqueueOne
和close
return任务,必须是运行:
// Thread 1
val f1 = Future {
for (i <- 0 to 10) {
println(s"enqueueOne $i")
Thread.sleep(100)
q.enqueueOne(i).run
}
println("closing")
q.close.run
println("closed")
}
我正在尝试按照 https://github.com/functional-streams-for-scala/fs2/wiki/Binding-to-asynchronous-processes
中的第一个示例填补一些空白并添加一些调试打印,我得到了以下代码:
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scalaz.concurrent.Task
import scalaz.stream.async.mutable.Queue
import scalaz.stream.{Process, Sink}
object ProcessTest {
def main(args: Array[String]): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz.stream.async
val q: Queue[Int] = async.unboundedQueue[Int]
val src: Process[Task, Int] = q.dequeue
// Thread 1
val f1 = Future {
for (i <- 0 to 10) {
println(s"enqueueOne $i")
Thread.sleep(100)
q.enqueueOne(i)
}
println("closing")
q.close
println("closed")
}
// Thread 2
val f2 = Future {
val buf = new collection.mutable.ArrayBuffer[Int]
val snk: Sink[Task, Int] = scalaz.stream.io.fillBuffer(buf)
val run: Task[Unit] = src.map(x => {
println(s"map $x")
x
}).to(snk).run
println("running")
run.get.runFor(3.seconds)
println(s"result = ${buf.toList}")
}
Await.result(f1, 10.seconds)
Await.result(f2, 10.seconds)
}
}
当我尝试 运行 时,线程 2 中没有收到任何内容:
enqueueOne 0
running
enqueueOne 1
enqueueOne 2
enqueueOne 3
enqueueOne 4
enqueueOne 5
enqueueOne 6
enqueueOne 7
enqueueOne 8
enqueueOne 9
enqueueOne 10
closing
closed
[error] (run-main-9) java.util.concurrent.TimeoutException
我做错了什么? 这是哪里阻塞了?
(我正在使用 scalaz-stream 0.8.6)
好的,我发现了问题:enqueueOne
和close
return任务,必须是运行:
// Thread 1
val f1 = Future {
for (i <- 0 to 10) {
println(s"enqueueOne $i")
Thread.sleep(100)
q.enqueueOne(i).run
}
println("closing")
q.close.run
println("closed")
}