"Spawn" WebSocket 端点中的并发效果
"Spawn" concurrent effect in a WebSocket endpoint
我有以下代码:
class ApiRoutes2[F[_]](implicit F: ConcurrentEffect[F]) extends Http4sDsl[F] {
var queue = Queue.bounded[F, String](100)
def createService(queue: Queue[F, String]): F[Unit] = ???
val service: HttpRoutes[F] = HttpRoutes.of[F] {
case PUT -> Root / "services" =>
val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
case Text(t, _) => F.delay(println(t))
case f => F.delay(println(s"Unknown type: $f"))
}
// How to "spawn" createService?
toClientF.flatMap { toClient =>
WebSocketBuilder[F].build(toClient, fromClient)
}
}
}
createService
是创建新服务的函数。创建新服务是一个非常复杂的过程,它涉及触发 CI 个管道,等待它们完成,然后以相同的方式触发更多 CI 个管道。它收到的队列将用于向浏览器报告当前正在执行的操作。
我想同时 "spawn" createService 并让它 运行 直到它完成。但是与此同时,我想立即 return 将 WebSocket 发送到客户端。也就是说,我无法在 "spawning" createService.
时阻止
我卡住了。我只能想到使用 shift
但这意味着 for comprehension 中的下一行将阻止等待 createService
完成然后 return websocket 到客户端。
我的做法是不是错了?我做错了什么?
由于 F
是 ConcurrentEffect
的一个实例,您还有一个 Concurrent
实例。
因此您可以使用 Concurrent[F].start
which returns a Fiber
到 运行 操作(如果不需要 cancel/ensure 虽然完成了)。
val service: HttpRoutes[F] = HttpRoutes.of[F] {
case PUT -> Root / "services" =>
val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
case Text(t, _) => F.delay(println(t))
case f => F.delay(println(s"Unknown type: $f"))
}
for {
toClient <- toClientF
_ <- Concurrent[F].start(createService)
websocket <- WebSocketBuilder[F].build(toClient, fromClient)
} yield websocket
}
我有以下代码:
class ApiRoutes2[F[_]](implicit F: ConcurrentEffect[F]) extends Http4sDsl[F] {
var queue = Queue.bounded[F, String](100)
def createService(queue: Queue[F, String]): F[Unit] = ???
val service: HttpRoutes[F] = HttpRoutes.of[F] {
case PUT -> Root / "services" =>
val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
case Text(t, _) => F.delay(println(t))
case f => F.delay(println(s"Unknown type: $f"))
}
// How to "spawn" createService?
toClientF.flatMap { toClient =>
WebSocketBuilder[F].build(toClient, fromClient)
}
}
}
createService
是创建新服务的函数。创建新服务是一个非常复杂的过程,它涉及触发 CI 个管道,等待它们完成,然后以相同的方式触发更多 CI 个管道。它收到的队列将用于向浏览器报告当前正在执行的操作。
我想同时 "spawn" createService 并让它 运行 直到它完成。但是与此同时,我想立即 return 将 WebSocket 发送到客户端。也就是说,我无法在 "spawning" createService.
时阻止我卡住了。我只能想到使用 shift
但这意味着 for comprehension 中的下一行将阻止等待 createService
完成然后 return websocket 到客户端。
我的做法是不是错了?我做错了什么?
由于 F
是 ConcurrentEffect
的一个实例,您还有一个 Concurrent
实例。
因此您可以使用 Concurrent[F].start
which returns a Fiber
到 运行 操作(如果不需要 cancel/ensure 虽然完成了)。
val service: HttpRoutes[F] = HttpRoutes.of[F] {
case PUT -> Root / "services" =>
val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
case Text(t, _) => F.delay(println(t))
case f => F.delay(println(s"Unknown type: $f"))
}
for {
toClient <- toClientF
_ <- Concurrent[F].start(createService)
websocket <- WebSocketBuilder[F].build(toClient, fromClient)
} yield websocket
}