FS2 将资源(或效果)作为状态传递

FS2 passing resource (or effect) as a state

我正在尝试实现一个控制相机的应用程序。相机命令表示为 CameraAction 对象流:

sealed trait CameraMessage
case object Record(recordId: String) extends CameraMessage
case object Stop extends CameraMessage

...

val s = Stream[F, CameraMessage]

假设我有一个测试流发出 "Record" 并在 20 秒后发出 "Stop" ,再过 20 秒后发出另一个 "Record" 消息等等,输入流是无限的。

然后应用程序使用 "Record" 它应该创建一个 GStreamer 管道的实例(即它是一个效果)和 "run" 它,在 "Stop" 它 'stops'管道并关闭它。然后在随后的 "Record" 中使用新的 GStreamer 管道重复该模式。

问题是我需要在流事件的句柄之间传递一个不纯的可变对象的实例。

FS2 文档建议使用块来使流有状态,所以我尝试了


def record(gStreamerPipeline: String, fileName: String)
(implicit sync: Sync[F]): F[Pipeline] = 
{ 
... create and open pipeline ... 
}

def stopRecording(pipe: Pipeline)(implicit sync: Sync[F]): F[Unit] = {
 ... stop pipeline, release resources ... 
}

def effectPipe(pipelineDef: String)(implicit L: Logger[F]): 
Pipe[F, CameraMessage, F[Unit]] = {
    type CameraSessionHandle = Pipeline
    type CameraStream = Stream[F, CameraSessionHandle]

    s: Stream[F, CameraMessage] =>
      s.scanChunks(Stream[F, CameraSessionHandle]()) {
        case (s: CameraStream, c: Chunk[CameraMessage]) =>
          c.last match {
            case Some(Record(fileName)) =>
              (Stream.bracket(record(pipelineDef, fileName))(p => stopRecording(p)), Chunk.empty)
            case Some(StopRecording) =>
              (Stream.empty, Chunk(s.compile.drain))
            case _ =>
              (s, Chunk.empty)
          }
      }
  }

此代码的问题是,实际记录不会发生在 'Record' 事件上,而是会评估整个块的效果,即当 'StopRecording' 消息到达时,相机会打开并且然后立即再次关闭。

如何在不分块的情况下传递 "state"?或者还有其他方法可以达到我需要的结果吗?

这可能类似于 FS2 Stream with StateT[IO, _, _], periodically dumping state 但不同之处在于,在我的例子中,状态不是纯粹的数据结构,而是一种资源。

我最终能够使用 https://typelevel.org/blog/2018/06/07/shared-state-in-fp.html

中描述的可变引用模式解决它

代码如下:

import cats.effect._
import cats.syntax.all._
import fs2.Stream

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.language.higherKinds

class FRef[F[_], T](implicit sync: Sync[F]) {
  private var state: T = _
  def set(n: T): F[Unit] = sync.delay(this.state = n)
  def get: F[T] = sync.pure(state)
}

object FRef {
  def apply[F[_], T](implicit sync: Sync[F]): F[FRef[F, T]] = sync.delay { new FRef() }
}

class CameraImpl(id: String) extends Camera {

  override def record(): Unit = {
    println(s"Recording $id")
  }

  override def stop(): Unit = {
    println(s"Stopping $id")
  }

  override def free(): Unit = {
    Thread.sleep(500)
    println(s"Freeing $id")
  }
}

object Camera {
  def apply(id: String) = new CameraImpl(id)
}

trait Camera {
  def record(): Unit
  def stop(): Unit
  def free(): Unit
}

sealed trait CameraMessage
case class Record(recordId: String) extends CameraMessage
case object StopRecording extends CameraMessage

class Streamer[F[_]](implicit sync: Sync[F]) {

  def record(id: String): F[Camera] = {
    sync.delay {
      val r = Camera(id)
      r.record()
      r
    }
  }

  def stopRecording(pipe: Camera): F[Unit] = {
    sync.delay {
      pipe.stop()
      pipe.free()
    }
  }

  def effectPipe(state: FRef[F, Option[Camera]])(
      implicit sync: Sync[F]): Stream[F, CameraMessage] => Stream[F, Unit] = {
    type CameraStream = Stream[F, Camera]

    s: Stream[F, CameraMessage] =>
      s.evalMap {
        case Record(fileName) =>
          for {
            r <- record(fileName)
            _ <- state.set(Some(r))
          } yield ()
        case StopRecording =>
          for {
            s <- state.get
            _ <- stopRecording(s.get)
            _ <- state.set(None)
          } yield ()
      }
  }
}

object FS2Problem extends IOApp {
  import scala.concurrent.duration._

  override def run(args: List[String]): IO[ExitCode] = {

    implicit val ec: ExecutionContextExecutor = ExecutionContext.global

    val streamer = new Streamer[IO]

    val s = Stream.awakeEvery[IO](5.seconds).take(10).zipWithIndex.map {
      case (_, idx) =>
        idx % 2 match {
          case 0 =>
            Record(s"Record $idx")
          case _ =>
            StopRecording
        }
    }

    val ss = for {
      streamerState <- Stream.eval(FRef[IO, Option[Camera]])
      s <- s.through(streamer.effectPipe(streamerState))
    } yield ()

    ss.compile.drain.map(_ => ExitCode.Success)
  }
}