如何终止从 SBT Shell 开始的 FS2 流?
How to terminate FS2 stream started from SBT Shell?
如果我运行这个来自SBT shell的程序,然后取消它,它会一直打印“hello”。我必须退出 SBT 才能让它停止。这是为什么?
import cats.effect.{ExitCode, IO, IOApp}
import fs2.Stream
import scala.concurrent.duration._
object FS2 extends IOApp {
override def run(args: List[String]) =
Stream.awakeEvery[IO](5.seconds).map { _ =>
println("hello")
}.compile.drain.as(ExitCode.Error)
}
正如评论中已经提到的那样,您的应用程序在另一个线程中运行并且它永远不会终止,因为流是无限的,因此当应用程序(每当您点击 ctrl+c
终止应用程序时就会发出)。
你可以这样做:
- 创建 Deferred 实例
- 在接收到 TERM 或 INT 信号后,使用它来触发
interruptWhen
。
例如:
import sun.misc.Signal
object FS2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = for {
cancel <- Deferred[IO, Either[Throwable, Unit]] //deferred used as flat telling if terminations signal
//was received
_ <- (IO.async_[Unit]{ cb =>
Signal.handle(
new Signal("INT"), //INT and TERM signals are nearly identical, we have to handle both
(sig: Signal) => cb(Right(()))
)
Signal.handle(
new Signal("TERM"),
(sig: Signal) => cb(Right(()))
)
} *> cancel.complete(Right(()))).start //after INT or TERM signal is intercepted it will complete
//deferred and terminate fiber
//we have to run method start to run waiting for signal in another fiber
//in other case program will block here
app <- Stream.awakeEvery[IO](1.seconds).map { _ => //your stream
println("hello")
}.interruptWhen(cancel).compile.drain.as(ExitCode.Error) //interruptWhen ends stream when deferred completes
} yield app
}
只要您在 sbt shell.
中点击 ctrl + c
,此版本的应用程序就会终止
如果我运行这个来自SBT shell的程序,然后取消它,它会一直打印“hello”。我必须退出 SBT 才能让它停止。这是为什么?
import cats.effect.{ExitCode, IO, IOApp}
import fs2.Stream
import scala.concurrent.duration._
object FS2 extends IOApp {
override def run(args: List[String]) =
Stream.awakeEvery[IO](5.seconds).map { _ =>
println("hello")
}.compile.drain.as(ExitCode.Error)
}
正如评论中已经提到的那样,您的应用程序在另一个线程中运行并且它永远不会终止,因为流是无限的,因此当应用程序(每当您点击 ctrl+c
终止应用程序时就会发出)。
你可以这样做:
- 创建 Deferred 实例
- 在接收到 TERM 或 INT 信号后,使用它来触发
interruptWhen
。
例如:
import sun.misc.Signal
object FS2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = for {
cancel <- Deferred[IO, Either[Throwable, Unit]] //deferred used as flat telling if terminations signal
//was received
_ <- (IO.async_[Unit]{ cb =>
Signal.handle(
new Signal("INT"), //INT and TERM signals are nearly identical, we have to handle both
(sig: Signal) => cb(Right(()))
)
Signal.handle(
new Signal("TERM"),
(sig: Signal) => cb(Right(()))
)
} *> cancel.complete(Right(()))).start //after INT or TERM signal is intercepted it will complete
//deferred and terminate fiber
//we have to run method start to run waiting for signal in another fiber
//in other case program will block here
app <- Stream.awakeEvery[IO](1.seconds).map { _ => //your stream
println("hello")
}.interruptWhen(cancel).compile.drain.as(ExitCode.Error) //interruptWhen ends stream when deferred completes
} yield app
}
只要您在 sbt shell.
中点击ctrl + c
,此版本的应用程序就会终止