使用流对多个函数调用建模(以安全的 FP 方式)
Model multiple function calls with a stream (in a safe, FP way)
给定一个函数 A => IO[B]
(又名 Kleisli[IO, A, B]
),该函数将被多次调用并具有副作用,例如更新数据库,如何将对它的多次调用委托给一个流(我猜 Pipe[IO, A, B]
)(fs2,monix observable/iterant)?这样做的原因是能够累积状态,在一段时间内一起批量调用 window 等
更具体地说,http4s 服务器需要一个 Request => IO[Response]
,所以我正在研究如何对流进行操作(为了上述好处),但最终还是为 http4s 提供了这样的功能。
我怀疑它在幕后需要一些关联 ID,我对此没有意见,我更感兴趣的是如何从 FP 的角度安全正确地完成它。
最终,我期望的签名可能是这样的:
Pipe[IO, A, B] => (A => IO[B])
,这样对 Kleisli 的调用就会通过管道传输。
事后想想,是否有可能背压?
一个想法是使用 MPSC(多发布者单一消费者)对其进行建模。我将以 Monix 为例,因为我对它更熟悉,但即使你使用 FS2,这个想法也保持不变。
object MPSC extends App {
sealed trait Event
object Event {
// You'll need a promise in order to send the response back to user
case class SaveItem(num: Int, promise: Deferred[Task, Int]) extends Event
}
// For backpressure, take a look at `PublishSubject`.
val cs = ConcurrentSubject[Event](MulticastStrategy.Publish)
def pushEvent(num: Int) = {
for {
promise <- Deferred[Task, Int]
_ <- Task.delay(cs.onNext(SaveItem(num, promise)))
} yield promise
}
// You get a list of events now since it is buffered
// Monix has a lot of buffer strategies, check the docs for more details
def processEvents(items: Seq[Event]): Task[Unit] = {
Task.delay(println(s"Items: $items")) >>
Task.traverse(items) {
case SaveItem(_, promise) => promise.complete(Random.nextInt(100))
}.void
}
val app = for {
// Start the stream in the background
_ <- cs
.bufferTimed(3.seconds) // Buffer all events within 3 seconds
.filter(_.nonEmpty)
.mapEval(processEvents)
.completedL
.startAndForget
_ <- Task.sleep(1.second)
p1 <- pushEvent(10)
p2 <- pushEvent(20)
p3 <- pushEvent(30)
// Wait for the promise to complete, you'll do this for each request
x <- p1.get
y <- p2.get
z <- p3.get
_ <- Task.delay(println(s"Completed promise: [$x, $y, $z]"))
} yield ()
app.runSyncUnsafe()
}
给定一个函数 A => IO[B]
(又名 Kleisli[IO, A, B]
),该函数将被多次调用并具有副作用,例如更新数据库,如何将对它的多次调用委托给一个流(我猜 Pipe[IO, A, B]
)(fs2,monix observable/iterant)?这样做的原因是能够累积状态,在一段时间内一起批量调用 window 等
更具体地说,http4s 服务器需要一个 Request => IO[Response]
,所以我正在研究如何对流进行操作(为了上述好处),但最终还是为 http4s 提供了这样的功能。
我怀疑它在幕后需要一些关联 ID,我对此没有意见,我更感兴趣的是如何从 FP 的角度安全正确地完成它。
最终,我期望的签名可能是这样的:
Pipe[IO, A, B] => (A => IO[B])
,这样对 Kleisli 的调用就会通过管道传输。
事后想想,是否有可能背压?
一个想法是使用 MPSC(多发布者单一消费者)对其进行建模。我将以 Monix 为例,因为我对它更熟悉,但即使你使用 FS2,这个想法也保持不变。
object MPSC extends App {
sealed trait Event
object Event {
// You'll need a promise in order to send the response back to user
case class SaveItem(num: Int, promise: Deferred[Task, Int]) extends Event
}
// For backpressure, take a look at `PublishSubject`.
val cs = ConcurrentSubject[Event](MulticastStrategy.Publish)
def pushEvent(num: Int) = {
for {
promise <- Deferred[Task, Int]
_ <- Task.delay(cs.onNext(SaveItem(num, promise)))
} yield promise
}
// You get a list of events now since it is buffered
// Monix has a lot of buffer strategies, check the docs for more details
def processEvents(items: Seq[Event]): Task[Unit] = {
Task.delay(println(s"Items: $items")) >>
Task.traverse(items) {
case SaveItem(_, promise) => promise.complete(Random.nextInt(100))
}.void
}
val app = for {
// Start the stream in the background
_ <- cs
.bufferTimed(3.seconds) // Buffer all events within 3 seconds
.filter(_.nonEmpty)
.mapEval(processEvents)
.completedL
.startAndForget
_ <- Task.sleep(1.second)
p1 <- pushEvent(10)
p2 <- pushEvent(20)
p3 <- pushEvent(30)
// Wait for the promise to complete, you'll do this for each request
x <- p1.get
y <- p2.get
z <- p3.get
_ <- Task.delay(println(s"Completed promise: [$x, $y, $z]"))
} yield ()
app.runSyncUnsafe()
}