FS2 将资源(或效果)作为状态传递
FS2 passing resource (or effect) as a state
我正在尝试实现一个控制相机的应用程序。相机命令表示为 CameraAction 对象流:
sealed trait CameraMessage
case object Record(recordId: String) extends CameraMessage
case object Stop extends CameraMessage
...
val s = Stream[F, CameraMessage]
假设我有一个测试流发出 "Record" 并在 20 秒后发出 "Stop" ,再过 20 秒后发出另一个 "Record" 消息等等,输入流是无限的。
然后应用程序使用 "Record" 它应该创建一个 GStreamer 管道的实例(即它是一个效果)和 "run" 它,在 "Stop" 它 'stops'管道并关闭它。然后在随后的 "Record" 中使用新的 GStreamer 管道重复该模式。
问题是我需要在流事件的句柄之间传递一个不纯的可变对象的实例。
FS2 文档建议使用块来使流有状态,所以我尝试了
def record(gStreamerPipeline: String, fileName: String)
(implicit sync: Sync[F]): F[Pipeline] =
{
... create and open pipeline ...
}
def stopRecording(pipe: Pipeline)(implicit sync: Sync[F]): F[Unit] = {
... stop pipeline, release resources ...
}
def effectPipe(pipelineDef: String)(implicit L: Logger[F]):
Pipe[F, CameraMessage, F[Unit]] = {
type CameraSessionHandle = Pipeline
type CameraStream = Stream[F, CameraSessionHandle]
s: Stream[F, CameraMessage] =>
s.scanChunks(Stream[F, CameraSessionHandle]()) {
case (s: CameraStream, c: Chunk[CameraMessage]) =>
c.last match {
case Some(Record(fileName)) =>
(Stream.bracket(record(pipelineDef, fileName))(p => stopRecording(p)), Chunk.empty)
case Some(StopRecording) =>
(Stream.empty, Chunk(s.compile.drain))
case _ =>
(s, Chunk.empty)
}
}
}
此代码的问题是,实际记录不会发生在 'Record' 事件上,而是会评估整个块的效果,即当 'StopRecording' 消息到达时,相机会打开并且然后立即再次关闭。
如何在不分块的情况下传递 "state"?或者还有其他方法可以达到我需要的结果吗?
这可能类似于
FS2 Stream with StateT[IO, _, _], periodically dumping state
但不同之处在于,在我的例子中,状态不是纯粹的数据结构,而是一种资源。
我最终能够使用 https://typelevel.org/blog/2018/06/07/shared-state-in-fp.html
中描述的可变引用模式解决它
代码如下:
import cats.effect._
import cats.syntax.all._
import fs2.Stream
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.language.higherKinds
class FRef[F[_], T](implicit sync: Sync[F]) {
private var state: T = _
def set(n: T): F[Unit] = sync.delay(this.state = n)
def get: F[T] = sync.pure(state)
}
object FRef {
def apply[F[_], T](implicit sync: Sync[F]): F[FRef[F, T]] = sync.delay { new FRef() }
}
class CameraImpl(id: String) extends Camera {
override def record(): Unit = {
println(s"Recording $id")
}
override def stop(): Unit = {
println(s"Stopping $id")
}
override def free(): Unit = {
Thread.sleep(500)
println(s"Freeing $id")
}
}
object Camera {
def apply(id: String) = new CameraImpl(id)
}
trait Camera {
def record(): Unit
def stop(): Unit
def free(): Unit
}
sealed trait CameraMessage
case class Record(recordId: String) extends CameraMessage
case object StopRecording extends CameraMessage
class Streamer[F[_]](implicit sync: Sync[F]) {
def record(id: String): F[Camera] = {
sync.delay {
val r = Camera(id)
r.record()
r
}
}
def stopRecording(pipe: Camera): F[Unit] = {
sync.delay {
pipe.stop()
pipe.free()
}
}
def effectPipe(state: FRef[F, Option[Camera]])(
implicit sync: Sync[F]): Stream[F, CameraMessage] => Stream[F, Unit] = {
type CameraStream = Stream[F, Camera]
s: Stream[F, CameraMessage] =>
s.evalMap {
case Record(fileName) =>
for {
r <- record(fileName)
_ <- state.set(Some(r))
} yield ()
case StopRecording =>
for {
s <- state.get
_ <- stopRecording(s.get)
_ <- state.set(None)
} yield ()
}
}
}
object FS2Problem extends IOApp {
import scala.concurrent.duration._
override def run(args: List[String]): IO[ExitCode] = {
implicit val ec: ExecutionContextExecutor = ExecutionContext.global
val streamer = new Streamer[IO]
val s = Stream.awakeEvery[IO](5.seconds).take(10).zipWithIndex.map {
case (_, idx) =>
idx % 2 match {
case 0 =>
Record(s"Record $idx")
case _ =>
StopRecording
}
}
val ss = for {
streamerState <- Stream.eval(FRef[IO, Option[Camera]])
s <- s.through(streamer.effectPipe(streamerState))
} yield ()
ss.compile.drain.map(_ => ExitCode.Success)
}
}
我正在尝试实现一个控制相机的应用程序。相机命令表示为 CameraAction 对象流:
sealed trait CameraMessage
case object Record(recordId: String) extends CameraMessage
case object Stop extends CameraMessage
...
val s = Stream[F, CameraMessage]
假设我有一个测试流发出 "Record" 并在 20 秒后发出 "Stop" ,再过 20 秒后发出另一个 "Record" 消息等等,输入流是无限的。
然后应用程序使用 "Record" 它应该创建一个 GStreamer 管道的实例(即它是一个效果)和 "run" 它,在 "Stop" 它 'stops'管道并关闭它。然后在随后的 "Record" 中使用新的 GStreamer 管道重复该模式。
问题是我需要在流事件的句柄之间传递一个不纯的可变对象的实例。
FS2 文档建议使用块来使流有状态,所以我尝试了
def record(gStreamerPipeline: String, fileName: String)
(implicit sync: Sync[F]): F[Pipeline] =
{
... create and open pipeline ...
}
def stopRecording(pipe: Pipeline)(implicit sync: Sync[F]): F[Unit] = {
... stop pipeline, release resources ...
}
def effectPipe(pipelineDef: String)(implicit L: Logger[F]):
Pipe[F, CameraMessage, F[Unit]] = {
type CameraSessionHandle = Pipeline
type CameraStream = Stream[F, CameraSessionHandle]
s: Stream[F, CameraMessage] =>
s.scanChunks(Stream[F, CameraSessionHandle]()) {
case (s: CameraStream, c: Chunk[CameraMessage]) =>
c.last match {
case Some(Record(fileName)) =>
(Stream.bracket(record(pipelineDef, fileName))(p => stopRecording(p)), Chunk.empty)
case Some(StopRecording) =>
(Stream.empty, Chunk(s.compile.drain))
case _ =>
(s, Chunk.empty)
}
}
}
此代码的问题是,实际记录不会发生在 'Record' 事件上,而是会评估整个块的效果,即当 'StopRecording' 消息到达时,相机会打开并且然后立即再次关闭。
如何在不分块的情况下传递 "state"?或者还有其他方法可以达到我需要的结果吗?
这可能类似于 FS2 Stream with StateT[IO, _, _], periodically dumping state 但不同之处在于,在我的例子中,状态不是纯粹的数据结构,而是一种资源。
我最终能够使用 https://typelevel.org/blog/2018/06/07/shared-state-in-fp.html
中描述的可变引用模式解决它代码如下:
import cats.effect._
import cats.syntax.all._
import fs2.Stream
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.language.higherKinds
class FRef[F[_], T](implicit sync: Sync[F]) {
private var state: T = _
def set(n: T): F[Unit] = sync.delay(this.state = n)
def get: F[T] = sync.pure(state)
}
object FRef {
def apply[F[_], T](implicit sync: Sync[F]): F[FRef[F, T]] = sync.delay { new FRef() }
}
class CameraImpl(id: String) extends Camera {
override def record(): Unit = {
println(s"Recording $id")
}
override def stop(): Unit = {
println(s"Stopping $id")
}
override def free(): Unit = {
Thread.sleep(500)
println(s"Freeing $id")
}
}
object Camera {
def apply(id: String) = new CameraImpl(id)
}
trait Camera {
def record(): Unit
def stop(): Unit
def free(): Unit
}
sealed trait CameraMessage
case class Record(recordId: String) extends CameraMessage
case object StopRecording extends CameraMessage
class Streamer[F[_]](implicit sync: Sync[F]) {
def record(id: String): F[Camera] = {
sync.delay {
val r = Camera(id)
r.record()
r
}
}
def stopRecording(pipe: Camera): F[Unit] = {
sync.delay {
pipe.stop()
pipe.free()
}
}
def effectPipe(state: FRef[F, Option[Camera]])(
implicit sync: Sync[F]): Stream[F, CameraMessage] => Stream[F, Unit] = {
type CameraStream = Stream[F, Camera]
s: Stream[F, CameraMessage] =>
s.evalMap {
case Record(fileName) =>
for {
r <- record(fileName)
_ <- state.set(Some(r))
} yield ()
case StopRecording =>
for {
s <- state.get
_ <- stopRecording(s.get)
_ <- state.set(None)
} yield ()
}
}
}
object FS2Problem extends IOApp {
import scala.concurrent.duration._
override def run(args: List[String]): IO[ExitCode] = {
implicit val ec: ExecutionContextExecutor = ExecutionContext.global
val streamer = new Streamer[IO]
val s = Stream.awakeEvery[IO](5.seconds).take(10).zipWithIndex.map {
case (_, idx) =>
idx % 2 match {
case 0 =>
Record(s"Record $idx")
case _ =>
StopRecording
}
}
val ss = for {
streamerState <- Stream.eval(FRef[IO, Option[Camera]])
s <- s.through(streamer.effectPipe(streamerState))
} yield ()
ss.compile.drain.map(_ => ExitCode.Success)
}
}