没有挂起的 scalaz-stream 队列
scalaz-stream queue without hanging
我有一个分为两部分的问题,所以让我先介绍一下背景。我知道可以做一些类似于我想要的事情:
import scalaz.concurrent._
import scalaz.stream._
val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.dequeue
q.enqueueAll(1 to 2).run
val p1: Process1[Int, Int] = process1.take(1)
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 1
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 2
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// hangs awaiting next input
是否有其他一些我可以使用的 p1
可以在不挂起的情况下给我下面的输出(就像 process1.awaitOption
)?
Answer: Some(1)
Answer: Some(2)
Answer: None
如果是的话,我想下一个问题就很容易回答了。是否有其他一些我可以使用的 p1
可以在不挂起的情况下给我下面的输出(就像 process1.chunkAll
)?
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()
编辑:
补充问题以使其更易于理解。如果我有这样的循环:
for (i <- 1 to 4) {
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
}
结果可能是:
Answer: Seq()
// if someone pushes some values into the queue, like: q.enqueueAll(1 to 2).run
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()
我希望现在我想做什么已经很清楚了。问题是我无法控制循环,如果队列中没有值,我不能阻止它。
我不确定我是否理解你试图拥有的语义,但通常过程可能会被中断(这意味着取消以等待某个值)通过外部关闭队列或使用 wye。打断。
您希望何时终止进程而不是等待下一个排队值?如果假设您想在空队列中使用它,则有 "size" 进程,如果大小为空,您可以使用它来中断等待队列,例如:
val empty : Process[Task,Boolean] = q.size.continuous.map(_ <= 0)
val deq : Process[Task,Int] = empty.wye(q.enqueue)(wye.interrupt)
虽然我无法让 以我想要的方式工作,但这是一个转折点,我可以根据他的建议使用尺寸信号。
我在这里发布我的答案以防有人需要它:
import scalaz.concurrent._
import scalaz.stream._
val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.size.continuous.take(1).flatMap { n => q.dequeue |> process1.take(n) }
q.enqueueAll(1 to 2).run
p.map(x => println(s"Answer: $x")).run.run
// Answer: 1
// Answer: 2
p.map(x => println(s"Answer: $x")).run.run
// not hanging awaiting next input
p.map(x => println(s"Answer: $x")).run.run
// not hanging awaiting next input
q.enqueueAll(1 to 2).run
p.map(x => println(s"Answer: $x")).run.run
// Answer: 1
// Answer: 2
我意识到它并没有完全回答问题,因为我没有明确的 p1
,但它对我的目的来说很好。
我有一个分为两部分的问题,所以让我先介绍一下背景。我知道可以做一些类似于我想要的事情:
import scalaz.concurrent._
import scalaz.stream._
val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.dequeue
q.enqueueAll(1 to 2).run
val p1: Process1[Int, Int] = process1.take(1)
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 1
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 2
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// hangs awaiting next input
是否有其他一些我可以使用的 p1
可以在不挂起的情况下给我下面的输出(就像 process1.awaitOption
)?
Answer: Some(1)
Answer: Some(2)
Answer: None
如果是的话,我想下一个问题就很容易回答了。是否有其他一些我可以使用的 p1
可以在不挂起的情况下给我下面的输出(就像 process1.chunkAll
)?
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()
编辑:
补充问题以使其更易于理解。如果我有这样的循环:
for (i <- 1 to 4) {
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
}
结果可能是:
Answer: Seq()
// if someone pushes some values into the queue, like: q.enqueueAll(1 to 2).run
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()
我希望现在我想做什么已经很清楚了。问题是我无法控制循环,如果队列中没有值,我不能阻止它。
我不确定我是否理解你试图拥有的语义,但通常过程可能会被中断(这意味着取消以等待某个值)通过外部关闭队列或使用 wye。打断。
您希望何时终止进程而不是等待下一个排队值?如果假设您想在空队列中使用它,则有 "size" 进程,如果大小为空,您可以使用它来中断等待队列,例如:
val empty : Process[Task,Boolean] = q.size.continuous.map(_ <= 0)
val deq : Process[Task,Int] = empty.wye(q.enqueue)(wye.interrupt)
虽然我无法让
我在这里发布我的答案以防有人需要它:
import scalaz.concurrent._
import scalaz.stream._
val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.size.continuous.take(1).flatMap { n => q.dequeue |> process1.take(n) }
q.enqueueAll(1 to 2).run
p.map(x => println(s"Answer: $x")).run.run
// Answer: 1
// Answer: 2
p.map(x => println(s"Answer: $x")).run.run
// not hanging awaiting next input
p.map(x => println(s"Answer: $x")).run.run
// not hanging awaiting next input
q.enqueueAll(1 to 2).run
p.map(x => println(s"Answer: $x")).run.run
// Answer: 1
// Answer: 2
我意识到它并没有完全回答问题,因为我没有明确的 p1
,但它对我的目的来说很好。