如何以编程方式关闭 fs2.StreamApp?

How to shutdown a fs2.StreamApp programmatically?

扩展 StreamApp 要求您提供 stream def。它有一个 requestShutdown 参数。

def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode]

我为此提供了实现,并理解 args 作为命令行参数传入。但是,我不确定 requestShutdown 参数由什么提供,我可以用它做什么。

具体来说,我想对正在启动 Http4s 服务器(永远阻塞)的 Stream[IO, ExitCode] 调用正常关闭。

看起来需要 Signal 并且必须设置?我正在尝试 'get at' 的基础流如下所示:

for {
   scheduler <- Scheduler[IO](corePoolSize = 1)
   exitCode  <- BlazeBuilder[IO]
                    .bindHttp(port, "0.0.0.0")
                    .mountService(services(scheduler), "/")
                    .serve
    } yield exitCode

我的 stream def 是 here and StreamAppSpec from the fs2 project has something in the StreamAppSpec 但我不知道该如何调整它。

您可以将提供给流函数的 requestShutdown 参数视为表示 执行时将请求程序终止的操作。

执行它会导致程序结束。

这是一个使用示例:

  override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
    for {
      scheduler <- Scheduler[IO](corePoolSize = 1)
      exitStream = scheduler.sleep[IO](10 seconds)
       .evalMap(_ => requestShutdown)
       .map(_ => ExitCode.Success)
      serverStream = BlazeBuilder[IO]
        .bindHttp(port, "0.0.0.0")
        .mountService(services(scheduler), "/")
        .serve
      result <- Stream.emits(List(exitStream, serverStream)).joinUnbounded
    } yield result

在这种情况下,我们创建两个流:

  • 第一个会等待10秒才会触发
    的效果 终止应用程序。

  • 第二个将运行http4s服务器。

然后我们加入这两个流,使它们 运行 同时意味着网络服务器将 运行 持续 10 秒,然后另一个流发出程序应该终止的信号。