在 scalaz-stream 中,当 Sink 终止时,我如何 运行 执行任务?

In scalaz-stream, how can i run a task when a Sink is terminated?

我从http4s复制了一个例子:

// Print received Text frames, and, on completion, notify the console
  val sink: Sink[Task, WebSocketFrame] = Process.constant {
    case Text(t) => Task.delay(println(t))
    case f       => Task.delay(println(s"Unknown type: $f"))
  }.onComplete(Process.eval[Task, Nothing](Task{println("Terminated!")}).drain)

这会产生一个编译错误:"Expression of type Unit doesn't conform to expected type _A"

我只想在接收器终止时打印行 "Terminated!"

如果您从 Process.eval 调用中删除类型,它应该可以工作:

... .onComplete(Process.eval(Task{println("Terminated!")}).drain)

这里的问题是您的 Process.eval 调用构造了一个 Process[Task, Unit] 而不是您通过明确给出类型所要求的 Process[Task, Nothing] 。之后调用 drainProcess[Task, Unit] 转换为 Process[Task, Nothing]

更新:这是一个在完成时执行一些副作用的接收器示例:

scala> val sink = io.stdOutLines.onComplete(Process.eval_(Task(println("END"))))
sink: scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String => scalaz.concurrent.Task[Unit]] = Append(Emit(Vector(<function1>)),Vector(<function1>, <function1>))

scala> Process("a", "b", "c").toSource.to(sink).run.run
a
b
c
END