Akka Http 和 Circe,如何序列化包装在无标签最终方法中的实体(Cats Effect)
Akka Http and Circe, how to serialize entities wrapped in the tagless final approach (Cats Effect)
我正在构建一个学习 Scala 3 的玩具项目,但我遇到了一个问题,首先我正在遵循 tagless-final 使用 cats-effect 的方法,该方法按预期工作除了实体序列化,当我尝试使用 akka-http 创建路由时,我遇到以下问题:
def routes: Route = pathPrefix("security") {
(path("auth") & post) {
entity(as[LoginUserByCredentialsCommand]) {
(command: LoginUserByCredentialsCommand) =>
complete {
login(command)
}
}
}}
F[
Either[com.moralyzr.magickr.security.core.errors.AuthError,
com.moralyzr.magickr.security.core.types.TokenType.Token
]
]
Required: akka.http.scaladsl.marshalling.ToResponseMarshallable
where: F is a type in class SecurityApi with bounds <: [_] =>> Any
据我了解,akka-http 不知道如何序列化 F highly-kinded 类型,通过稍微搜索我找到了 following solution,它包括创建一个隐式调用 marshallable 来显示akka-http 如何序列化类型,但是当我实现它时我得到一个 Whosebug 错误:(
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import cats.effect.IO
trait Marshallable[F[_]]:
def marshaller[A: ToResponseMarshaller]: ToResponseMarshaller[F[A]]
object Marshallable:
implicit def marshaller[F[_], A : ToResponseMarshaller](implicit M: Marshallable[F]): ToResponseMarshaller[F[A]] =
M.marshaller
given ioMarshaller: Marshallable[IO] with
def marshaller[A: ToResponseMarshaller] = implicitly
我现在真的被困住了,有没有人知道我该如何解决这个问题?完整代码可见here
编辑:这是登录码
为清楚起见,这里是实例化安全性 api 的 class 和安全性 api 本身
object Magickr extends IOApp:
override def run(args: List[String]): IO[ExitCode] =
val server = for {
// Actors
actorsSystem <- ActorsSystemResource[IO]()
streamMaterializer <- AkkaMaterializerResource[IO](actorsSystem)
// Configs
configs <- Resource.eval(MagickrConfigs.makeConfigs[IO]())
httpConfigs = AkkaHttpConfig[IO](configs)
databaseConfigs = DatabaseConfig[IO](configs)
flywayConfigs = FlywayConfig[IO](configs)
jwtConfig = JwtConfig[IO](configs)
// Interpreters
jwtManager = JwtBuilder[IO](jwtConfig)
authentication = InternalAuthentication[IO](
passwordValidationAlgebra = new SecurityValidationsInterpreter(),
jwtManager = jwtManager
)
// Database
_ <- Resource.eval(
DbMigrations.migrate[IO](flywayConfigs, databaseConfigs)
)
transactor <- DatabaseConnection.makeTransactor[IO](databaseConfigs)
userRepository = UserRepository[IO](transactor)
// Services
securityManagement = SecurityManagement[IO](
findUser = userRepository,
authentication = authentication
)
// Api
secApi = new SecurityApi[IO](securityManagement)
routes = pathPrefix("api") {
secApi.routes()
}
akkaHttp <- AkkaHttpResource.makeHttpServer[IO](
akkaHttpConfig = httpConfigs,
routes = routes,
actorSystem = actorsSystem,
materializer = streamMaterializer
)
} yield (actorsSystem)
return server.useForever
和
class SecurityApi[F[_]: Async](
private val securityManagement: SecurityManagement[F]
) extends LoginUserByCredentials[F]
with SecurityProtocols:
def routes()(using marshaller: Marshallable[F]): Route = pathPrefix("security") {
(path("auth") & post) {
entity(as[LoginUserByCredentialsCommand]) {
(command: LoginUserByCredentialsCommand) =>
complete {
login(command)
}
}
}
}
override def login(
command: LoginUserByCredentialsCommand
): F[Either[AuthError, Token]] =
securityManagement.loginWithCredentials(command = command).value
================= 编辑 2 ========================== ===============
根据 Luis Miguel 提供的见解,我更清楚地意识到我需要在 Marshaller 级别将 IO 解包到 Future 中,如下所示:
def ioToResponseMarshaller[A: ToResponseMarshaller](
M: Marshallable[IO]
): ToResponseMarshaller[IO[A]] =
Marshaller.futureMarshaller.compose(M.entity.unsafeToFuture())
但是,我遇到了这个问题:
Found: cats.effect.unsafe.IORuntime => scala.concurrent.Future[A]
Required: cats.effect.IO[A] => scala.concurrent.Future[A]
我想我很接近了!有没有办法打开 IO 以保持 IO 类型?
我成功了!感谢 @luismiguel 的洞察力,问题是 Akka HTTP Marshaller 无法处理 Cats-Effect IO monad,所以解决方案是使用编组器内部的 unsafeToFuture 解包 IO monad 的实现,这样我能够保持点到点的 Tagless-Final 风格,这里是解决方案:
此隐式获取类型的内部编组器
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import cats.effect.IO
trait Marshallable[F[_]]:
def marshaller[A: ToResponseMarshaller]: ToResponseMarshaller[F[A]]
object Marshallable:
implicit def marshaller[F[_], A: ToResponseMarshaller](implicit
M: Marshallable[F]
): ToResponseMarshaller[F[A]] = M.marshaller
given ioMarshallable: Marshallable[IO] with
def marshaller[A: ToResponseMarshaller] = CatsEffectsMarshallers.ioMarshaller
这个解包 IO monad 并使用 future 对 marshaller 进行平面映射,akka-http 知道如何处理。
import akka.http.scaladsl.marshalling.{
LowPriorityToResponseMarshallerImplicits,
Marshaller,
ToResponseMarshaller
}
import cats.effect.IO
import cats.effect.unsafe.implicits.global
trait CatsEffectsMarshallers extends LowPriorityToResponseMarshallerImplicits:
implicit def ioMarshaller[A](implicit
m: ToResponseMarshaller[A]
): ToResponseMarshaller[IO[A]] =
Marshaller(implicit ec => _.unsafeToFuture().flatMap(m(_)))
object CatsEffectsMarshallers extends CatsEffectsMarshallers
我正在构建一个学习 Scala 3 的玩具项目,但我遇到了一个问题,首先我正在遵循 tagless-final 使用 cats-effect 的方法,该方法按预期工作除了实体序列化,当我尝试使用 akka-http 创建路由时,我遇到以下问题:
def routes: Route = pathPrefix("security") {
(path("auth") & post) {
entity(as[LoginUserByCredentialsCommand]) {
(command: LoginUserByCredentialsCommand) =>
complete {
login(command)
}
}
}}
F[
Either[com.moralyzr.magickr.security.core.errors.AuthError,
com.moralyzr.magickr.security.core.types.TokenType.Token
]
]
Required: akka.http.scaladsl.marshalling.ToResponseMarshallable
where: F is a type in class SecurityApi with bounds <: [_] =>> Any
据我了解,akka-http 不知道如何序列化 F highly-kinded 类型,通过稍微搜索我找到了 following solution,它包括创建一个隐式调用 marshallable 来显示akka-http 如何序列化类型,但是当我实现它时我得到一个 Whosebug 错误:(
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import cats.effect.IO
trait Marshallable[F[_]]:
def marshaller[A: ToResponseMarshaller]: ToResponseMarshaller[F[A]]
object Marshallable:
implicit def marshaller[F[_], A : ToResponseMarshaller](implicit M: Marshallable[F]): ToResponseMarshaller[F[A]] =
M.marshaller
given ioMarshaller: Marshallable[IO] with
def marshaller[A: ToResponseMarshaller] = implicitly
我现在真的被困住了,有没有人知道我该如何解决这个问题?完整代码可见here
编辑:这是登录码
为清楚起见,这里是实例化安全性 api 的 class 和安全性 api 本身
object Magickr extends IOApp:
override def run(args: List[String]): IO[ExitCode] =
val server = for {
// Actors
actorsSystem <- ActorsSystemResource[IO]()
streamMaterializer <- AkkaMaterializerResource[IO](actorsSystem)
// Configs
configs <- Resource.eval(MagickrConfigs.makeConfigs[IO]())
httpConfigs = AkkaHttpConfig[IO](configs)
databaseConfigs = DatabaseConfig[IO](configs)
flywayConfigs = FlywayConfig[IO](configs)
jwtConfig = JwtConfig[IO](configs)
// Interpreters
jwtManager = JwtBuilder[IO](jwtConfig)
authentication = InternalAuthentication[IO](
passwordValidationAlgebra = new SecurityValidationsInterpreter(),
jwtManager = jwtManager
)
// Database
_ <- Resource.eval(
DbMigrations.migrate[IO](flywayConfigs, databaseConfigs)
)
transactor <- DatabaseConnection.makeTransactor[IO](databaseConfigs)
userRepository = UserRepository[IO](transactor)
// Services
securityManagement = SecurityManagement[IO](
findUser = userRepository,
authentication = authentication
)
// Api
secApi = new SecurityApi[IO](securityManagement)
routes = pathPrefix("api") {
secApi.routes()
}
akkaHttp <- AkkaHttpResource.makeHttpServer[IO](
akkaHttpConfig = httpConfigs,
routes = routes,
actorSystem = actorsSystem,
materializer = streamMaterializer
)
} yield (actorsSystem)
return server.useForever
和
class SecurityApi[F[_]: Async](
private val securityManagement: SecurityManagement[F]
) extends LoginUserByCredentials[F]
with SecurityProtocols:
def routes()(using marshaller: Marshallable[F]): Route = pathPrefix("security") {
(path("auth") & post) {
entity(as[LoginUserByCredentialsCommand]) {
(command: LoginUserByCredentialsCommand) =>
complete {
login(command)
}
}
}
}
override def login(
command: LoginUserByCredentialsCommand
): F[Either[AuthError, Token]] =
securityManagement.loginWithCredentials(command = command).value
================= 编辑 2 ========================== =============== 根据 Luis Miguel 提供的见解,我更清楚地意识到我需要在 Marshaller 级别将 IO 解包到 Future 中,如下所示:
def ioToResponseMarshaller[A: ToResponseMarshaller](
M: Marshallable[IO]
): ToResponseMarshaller[IO[A]] =
Marshaller.futureMarshaller.compose(M.entity.unsafeToFuture())
但是,我遇到了这个问题:
Found: cats.effect.unsafe.IORuntime => scala.concurrent.Future[A]
Required: cats.effect.IO[A] => scala.concurrent.Future[A]
我想我很接近了!有没有办法打开 IO 以保持 IO 类型?
我成功了!感谢 @luismiguel 的洞察力,问题是 Akka HTTP Marshaller 无法处理 Cats-Effect IO monad,所以解决方案是使用编组器内部的 unsafeToFuture 解包 IO monad 的实现,这样我能够保持点到点的 Tagless-Final 风格,这里是解决方案:
此隐式获取类型的内部编组器
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import cats.effect.IO
trait Marshallable[F[_]]:
def marshaller[A: ToResponseMarshaller]: ToResponseMarshaller[F[A]]
object Marshallable:
implicit def marshaller[F[_], A: ToResponseMarshaller](implicit
M: Marshallable[F]
): ToResponseMarshaller[F[A]] = M.marshaller
given ioMarshallable: Marshallable[IO] with
def marshaller[A: ToResponseMarshaller] = CatsEffectsMarshallers.ioMarshaller
这个解包 IO monad 并使用 future 对 marshaller 进行平面映射,akka-http 知道如何处理。
import akka.http.scaladsl.marshalling.{
LowPriorityToResponseMarshallerImplicits,
Marshaller,
ToResponseMarshaller
}
import cats.effect.IO
import cats.effect.unsafe.implicits.global
trait CatsEffectsMarshallers extends LowPriorityToResponseMarshallerImplicits:
implicit def ioMarshaller[A](implicit
m: ToResponseMarshaller[A]
): ToResponseMarshaller[IO[A]] =
Marshaller(implicit ec => _.unsafeToFuture().flatMap(m(_)))
object CatsEffectsMarshallers extends CatsEffectsMarshallers