在 scalaz-stream 中,当 Sink 终止时,我如何 运行 执行任务?
In scalaz-stream, how can i run a task when a Sink is terminated?
我从http4s复制了一个例子:
// Print received Text frames, and, on completion, notify the console
val sink: Sink[Task, WebSocketFrame] = Process.constant {
case Text(t) => Task.delay(println(t))
case f => Task.delay(println(s"Unknown type: $f"))
}.onComplete(Process.eval[Task, Nothing](Task{println("Terminated!")}).drain)
这会产生一个编译错误:"Expression of type Unit doesn't conform to expected type _A"
我只想在接收器终止时打印行 "Terminated!"
如果您从 Process.eval
调用中删除类型,它应该可以工作:
... .onComplete(Process.eval(Task{println("Terminated!")}).drain)
这里的问题是您的 Process.eval
调用构造了一个 Process[Task, Unit]
而不是您通过明确给出类型所要求的 Process[Task, Nothing]
。之后调用 drain
将 Process[Task, Unit]
转换为 Process[Task, Nothing]
。
更新:这是一个在完成时执行一些副作用的接收器示例:
scala> val sink = io.stdOutLines.onComplete(Process.eval_(Task(println("END"))))
sink: scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String => scalaz.concurrent.Task[Unit]] = Append(Emit(Vector(<function1>)),Vector(<function1>, <function1>))
scala> Process("a", "b", "c").toSource.to(sink).run.run
a
b
c
END
我从http4s复制了一个例子:
// Print received Text frames, and, on completion, notify the console
val sink: Sink[Task, WebSocketFrame] = Process.constant {
case Text(t) => Task.delay(println(t))
case f => Task.delay(println(s"Unknown type: $f"))
}.onComplete(Process.eval[Task, Nothing](Task{println("Terminated!")}).drain)
这会产生一个编译错误:"Expression of type Unit doesn't conform to expected type _A"
我只想在接收器终止时打印行 "Terminated!"
如果您从 Process.eval
调用中删除类型,它应该可以工作:
... .onComplete(Process.eval(Task{println("Terminated!")}).drain)
这里的问题是您的 Process.eval
调用构造了一个 Process[Task, Unit]
而不是您通过明确给出类型所要求的 Process[Task, Nothing]
。之后调用 drain
将 Process[Task, Unit]
转换为 Process[Task, Nothing]
。
更新:这是一个在完成时执行一些副作用的接收器示例:
scala> val sink = io.stdOutLines.onComplete(Process.eval_(Task(println("END"))))
sink: scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String => scalaz.concurrent.Task[Unit]] = Append(Emit(Vector(<function1>)),Vector(<function1>, <function1>))
scala> Process("a", "b", "c").toSource.to(sink).run.run
a
b
c
END