Akka gRPC + Slick 应用程序导致 "IllegalStateException: Cannot initialize ExecutionContext; AsyncExecutor already shut down"
Akka gRPC + Slick application causes "IllegalStateException: Cannot initialize ExecutionContext; AsyncExecutor already shut down"
我尝试使用 Akka-gRPC and Slick. I also use Airframe 为 DI 开发 gRPC 服务器。
源代码是here
问题是作为gRPC服务器执行时如果收到请求会导致失败。
如果它不是作为gRPC服务器启动,而是只是从数据库中读取资源,则该过程成功。
有什么区别?
接下来,它使用 slick 从数据库中读取对象。
...Component
是机身对象。它将被主模块使用。
trait UserRepository {
def getUser: Future[Seq[Tables.UsersRow]]
}
class UserRepositoryImpl(val profile: JdbcProfile, val db: JdbcProfile#Backend#Database) extends UserRepository {
import profile.api._
def getUser: Future[Seq[Tables.UsersRow]] = db.run(Tables.Users.result)
}
trait UserResolveService {
private val repository = bind[UserRepository]
def getAll: Future[Seq[Tables.UsersRow]] =
repository.getUser
}
object userServiceComponent {
val design = newDesign
.bind[UserResolveService]
.toSingleton
}
gRPC 服务器源代码如下。
trait UserServiceImpl extends UserService {
private val userResolveService = bind[UserResolveService]
private val system: ActorSystem = bind[ActorSystem]
implicit val ec: ExecutionContextExecutor = system.dispatcher
override def getAll(in: GetUserListRequest): Future[GetUserListResponse] = {
userResolveService.getAll.map(us =>
GetUserListResponse(
us.map(u =>
myapp.proto.user.User(
1,
"t_horikoshi@example.com",
"t_horikoshi",
myapp.proto.user.User.UserRole.Admin
)
)
)
)
}
}
trait GRPCServer {
private val userServiceImpl = bind[UserServiceImpl]
implicit val system: ActorSystem = bind[ActorSystem]
def run(): Future[Http.ServerBinding] = {
implicit def ec: ExecutionContext = system.dispatcher
val service: PartialFunction[HttpRequest, Future[HttpResponse]] =
UserServiceHandler.partial(userServiceImpl)
val reflection: PartialFunction[HttpRequest, Future[HttpResponse]] =
ServerReflection.partial(List(UserService))
// Akka HTTP 10.1 requires adapters to accept the new actors APIs
val bound = Http().bindAndHandleAsync(
ServiceHandler.concatOrNotFound(service, reflection),
interface = "127.0.0.1",
port = 8080,
settings = ServerSettings(system)
)
bound.onComplete {
case Success(binding) =>
system.log.info(
s"gRPC Server online at http://${binding.localAddress.getHostName}:${binding.localAddress.getPort}/"
)
case Failure(ex) =>
system.log.error(ex, "occurred error")
}
bound
}
}
object grpcComponent {
val design = newDesign
.bind[UserServiceImpl]
.toSingleton
.bind[GRPCServer]
.toSingleton
}
以下是主模块。
object Main extends App {
val conf = ConfigFactory
.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())
val system = ActorSystem("GRPCServer", conf)
val dbConfig: DatabaseConfig[JdbcProfile] =
DatabaseConfig.forConfig[JdbcProfile](path = "mydb")
val design = newDesign
.bind[JdbcProfile]
.toInstance(dbConfig.profile)
.bind[JdbcProfile#Backend#Database]
.toInstance(dbConfig.db)
.bind[UserRepository]
.to[UserRepositoryImpl]
.bind[ActorSystem]
.toInstance(system)
.add(userServiceComponent.design)
.add(grpcComponent.design)
design.withSession(s =>
// Await.result(s.build[UserResolveService].getUser, Duration.Inf)) // success
// Await.result(s.build[UserServiceImpl].getAll(GetUserListRequest()), Duration.Inf)) // success
s.build[GRPCServer].run() // cause IllegalStateException when reciece request.
)
}
当直接调用UserResolveService
和UserServiceImpl
时,从数据库加载对象的过程成功。
但是,当运行应用程序作为gRPC Server时,收到请求时出现错误。
想了一天,还是下不了决心...
请你帮我解决一下好吗
已解决。如果执行异步进程,它必须使用 newSession 启动 gRPC 服务器。
我喜欢 that.
我尝试使用 Akka-gRPC and Slick. I also use Airframe 为 DI 开发 gRPC 服务器。
源代码是here
问题是作为gRPC服务器执行时如果收到请求会导致失败。 如果它不是作为gRPC服务器启动,而是只是从数据库中读取资源,则该过程成功。
有什么区别?
接下来,它使用 slick 从数据库中读取对象。
...Component
是机身对象。它将被主模块使用。
trait UserRepository {
def getUser: Future[Seq[Tables.UsersRow]]
}
class UserRepositoryImpl(val profile: JdbcProfile, val db: JdbcProfile#Backend#Database) extends UserRepository {
import profile.api._
def getUser: Future[Seq[Tables.UsersRow]] = db.run(Tables.Users.result)
}
trait UserResolveService {
private val repository = bind[UserRepository]
def getAll: Future[Seq[Tables.UsersRow]] =
repository.getUser
}
object userServiceComponent {
val design = newDesign
.bind[UserResolveService]
.toSingleton
}
gRPC 服务器源代码如下。
trait UserServiceImpl extends UserService {
private val userResolveService = bind[UserResolveService]
private val system: ActorSystem = bind[ActorSystem]
implicit val ec: ExecutionContextExecutor = system.dispatcher
override def getAll(in: GetUserListRequest): Future[GetUserListResponse] = {
userResolveService.getAll.map(us =>
GetUserListResponse(
us.map(u =>
myapp.proto.user.User(
1,
"t_horikoshi@example.com",
"t_horikoshi",
myapp.proto.user.User.UserRole.Admin
)
)
)
)
}
}
trait GRPCServer {
private val userServiceImpl = bind[UserServiceImpl]
implicit val system: ActorSystem = bind[ActorSystem]
def run(): Future[Http.ServerBinding] = {
implicit def ec: ExecutionContext = system.dispatcher
val service: PartialFunction[HttpRequest, Future[HttpResponse]] =
UserServiceHandler.partial(userServiceImpl)
val reflection: PartialFunction[HttpRequest, Future[HttpResponse]] =
ServerReflection.partial(List(UserService))
// Akka HTTP 10.1 requires adapters to accept the new actors APIs
val bound = Http().bindAndHandleAsync(
ServiceHandler.concatOrNotFound(service, reflection),
interface = "127.0.0.1",
port = 8080,
settings = ServerSettings(system)
)
bound.onComplete {
case Success(binding) =>
system.log.info(
s"gRPC Server online at http://${binding.localAddress.getHostName}:${binding.localAddress.getPort}/"
)
case Failure(ex) =>
system.log.error(ex, "occurred error")
}
bound
}
}
object grpcComponent {
val design = newDesign
.bind[UserServiceImpl]
.toSingleton
.bind[GRPCServer]
.toSingleton
}
以下是主模块。
object Main extends App {
val conf = ConfigFactory
.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())
val system = ActorSystem("GRPCServer", conf)
val dbConfig: DatabaseConfig[JdbcProfile] =
DatabaseConfig.forConfig[JdbcProfile](path = "mydb")
val design = newDesign
.bind[JdbcProfile]
.toInstance(dbConfig.profile)
.bind[JdbcProfile#Backend#Database]
.toInstance(dbConfig.db)
.bind[UserRepository]
.to[UserRepositoryImpl]
.bind[ActorSystem]
.toInstance(system)
.add(userServiceComponent.design)
.add(grpcComponent.design)
design.withSession(s =>
// Await.result(s.build[UserResolveService].getUser, Duration.Inf)) // success
// Await.result(s.build[UserServiceImpl].getAll(GetUserListRequest()), Duration.Inf)) // success
s.build[GRPCServer].run() // cause IllegalStateException when reciece request.
)
}
当直接调用UserResolveService
和UserServiceImpl
时,从数据库加载对象的过程成功。
但是,当运行应用程序作为gRPC Server时,收到请求时出现错误。
想了一天,还是下不了决心... 请你帮我解决一下好吗
已解决。如果执行异步进程,它必须使用 newSession 启动 gRPC 服务器。 我喜欢 that.