为什么我的代码没有返回任何东西?斯卡拉FS2
why is my code not returning anything ? Scala fs2
该程序允许将 Mapping Ints 推入 Double 并从队列中识别退出时间。
该程序未显示任何错误,但未打印任何内容。
我错过了什么?
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {
val streamData = Stream.emit(1)
val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData
def storeInQueue: Stream[IO, Unit] = {
scheduledStream
.map { n =>
val entryTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n.toDouble, entryTime)
}
.through(q1.enqueue)
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
q1.dequeue
.evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
.map { n =>
val exitTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n._1, exitTime)
}
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, IO[Long])](1)
b = new Tst(q)
_ <- b.storeInQueue.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
IO 是延迟求值的——要执行某些东西,它必须是创建最终 IO 值的表达式的一部分。
这里:
def storeInQueue: Stream[IO, Unit] = {
scheduledStream ... // no side effects are run when we create this!
q1.dequeue ... // not using scheduledStream
}
value scheduledStream
根本没有被使用,所以它不是 storeInQueue
返回值的 "a part" 所以当 IOApp
将 IO 值转换为计算时,您程序的配方不包含将消息推送到队列的部分,因此队列始终为空。
订阅队列的部分工作,但由于队列中没有任何内容,它一直在等待永远不会到达的项目。
您必须在 "making them part of one IO value" 之前启动两个流,例如像这样:
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {
val streamData = Stream.emit(1)
val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData
def storeInQueue =
scheduledStream
.map { n =>
val entryTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n.toDouble, entryTime)
}
.through(q1.enqueue)
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
def takeFromQueue =
q1.dequeue
.evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
.map { n =>
val exitTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n._1, exitTime)
}
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, IO[Long])](1)
b = new Tst(q)
pushFiber <- b.storeInQueue.compile.drain.start // run as fiber
pullFiber <- b.takeFromQueue.compile.drain.start // run as fiber
} yield ()
program.as(ExitCode.Success)
}
}
该程序允许将 Mapping Ints 推入 Double 并从队列中识别退出时间。 该程序未显示任何错误,但未打印任何内容。 我错过了什么?
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {
val streamData = Stream.emit(1)
val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData
def storeInQueue: Stream[IO, Unit] = {
scheduledStream
.map { n =>
val entryTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n.toDouble, entryTime)
}
.through(q1.enqueue)
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
q1.dequeue
.evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
.map { n =>
val exitTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n._1, exitTime)
}
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, IO[Long])](1)
b = new Tst(q)
_ <- b.storeInQueue.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
IO 是延迟求值的——要执行某些东西,它必须是创建最终 IO 值的表达式的一部分。
这里:
def storeInQueue: Stream[IO, Unit] = {
scheduledStream ... // no side effects are run when we create this!
q1.dequeue ... // not using scheduledStream
}
value scheduledStream
根本没有被使用,所以它不是 storeInQueue
返回值的 "a part" 所以当 IOApp
将 IO 值转换为计算时,您程序的配方不包含将消息推送到队列的部分,因此队列始终为空。
订阅队列的部分工作,但由于队列中没有任何内容,它一直在等待永远不会到达的项目。
您必须在 "making them part of one IO value" 之前启动两个流,例如像这样:
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {
val streamData = Stream.emit(1)
val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData
def storeInQueue =
scheduledStream
.map { n =>
val entryTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n.toDouble, entryTime)
}
.through(q1.enqueue)
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
def takeFromQueue =
q1.dequeue
.evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
.map { n =>
val exitTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n._1, exitTime)
}
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, IO[Long])](1)
b = new Tst(q)
pushFiber <- b.storeInQueue.compile.drain.start // run as fiber
pullFiber <- b.takeFromQueue.compile.drain.start // run as fiber
} yield ()
program.as(ExitCode.Success)
}
}