使用 Scala + Slick + MySQL+ Akka + Stream 时面临的问题
Facing Issue in using Scala + Slick + MySQL+ Akka + Stream
问题陈述:我们正在将 MySQL DB table 中特定模块的用户所有传入请求参数添加为一行(这是一个巨大的数据)。现在,我们要设计一个进程,它将从此 table 中读取每条记录,并通过调用第三方 API 获取有关该用户请求的更多信息,然后将此 returned 元信息在另一个 table 中。
当前尝试:
我正在使用 Scala + Slick 来执行此操作。由于要读取的数据很大,所以我想一次读取这个table行并处理它。我尝试使用 slick + akka 流,但是我得到 'java.util.concurrent.RejectedExecutionException'
以下是我试过的粗略逻辑,
implicit val system = ActorSystem("Example")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val future = db.stream(SomeQuery.result)
Source.fromPublisher(future).map(row => {
id = dataEnrichmentAPI.process(row)
}).runForeach(id => println("Processed row : "+ id))
dataEnrichmentAPI.process :此函数进行第三方 REST 调用,还进行一些数据库查询以获取所需数据。此数据库查询是使用 'db.run' 方法完成的,它也会等待直到完成(使用等待)
例如,
def process(row: RequestRecord): Int = {
// SomeQuery2 = Check if data is already there in DB
val retId: Seq[Int] = Await.result(db.run(SomeQuery2.result), Duration.Inf)
if(retId.isEmpty){
val metaData = RestCall()
// SomeQuery3 = Store this metaData in DB
Await.result(db.run(SomeQuery3.result), Duration.Inf)
return metaData.id;
}else{
// SomeQuery4 = Get meta data id
return Await.result(db.run(SomeQuery4.result), Duration.Inf)
}
}
我在使用对数据库的阻塞调用时遇到此异常。我不认为我是否可以摆脱它,因为 return 值是以后流程继续所必需的。
'blocking call' 是否是此异常背后的原因?
解决此类问题的最佳做法是什么?
谢谢。
我不知道这是不是你的问题(细节太少),但你永远不应该阻止。
说到最佳实践,我们 async stages 代替。
这或多或少是你的代码在不使用 Await.result:
的情况下的样子
def process(row: RequestRecord): Future[Int] = {
db.run(SomeQuery2.result) flatMap {
case retId if retId.isEmpty =>
// what is this? is it a sync call? if it's a rest call it should return a future
val metaData = RestCall()
db.run(SomeQuery3.result).map(_ => metaData.id)
case _ => db.run(SomeQuery4.result)
}
}
Source.fromPublisher(db.stream(SomeQuery.result))
// choose your own parallelism
.mapAsync(2)(dataEnrichmentAPI.process)
.runForeach(id => println("Processed row : "+ id))
通过这种方式,您将明确地、惯用地处理背压和并行性。
尝试从不在生产代码中调用Await.result并且只compose futures using map, flatMap and for comprehensions
问题陈述:我们正在将 MySQL DB table 中特定模块的用户所有传入请求参数添加为一行(这是一个巨大的数据)。现在,我们要设计一个进程,它将从此 table 中读取每条记录,并通过调用第三方 API 获取有关该用户请求的更多信息,然后将此 returned 元信息在另一个 table 中。
当前尝试:
我正在使用 Scala + Slick 来执行此操作。由于要读取的数据很大,所以我想一次读取这个table行并处理它。我尝试使用 slick + akka 流,但是我得到 'java.util.concurrent.RejectedExecutionException'
以下是我试过的粗略逻辑,
implicit val system = ActorSystem("Example")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val future = db.stream(SomeQuery.result)
Source.fromPublisher(future).map(row => {
id = dataEnrichmentAPI.process(row)
}).runForeach(id => println("Processed row : "+ id))
dataEnrichmentAPI.process :此函数进行第三方 REST 调用,还进行一些数据库查询以获取所需数据。此数据库查询是使用 'db.run' 方法完成的,它也会等待直到完成(使用等待)
例如,
def process(row: RequestRecord): Int = {
// SomeQuery2 = Check if data is already there in DB
val retId: Seq[Int] = Await.result(db.run(SomeQuery2.result), Duration.Inf)
if(retId.isEmpty){
val metaData = RestCall()
// SomeQuery3 = Store this metaData in DB
Await.result(db.run(SomeQuery3.result), Duration.Inf)
return metaData.id;
}else{
// SomeQuery4 = Get meta data id
return Await.result(db.run(SomeQuery4.result), Duration.Inf)
}
}
我在使用对数据库的阻塞调用时遇到此异常。我不认为我是否可以摆脱它,因为 return 值是以后流程继续所必需的。
'blocking call' 是否是此异常背后的原因? 解决此类问题的最佳做法是什么?
谢谢。
我不知道这是不是你的问题(细节太少),但你永远不应该阻止。
说到最佳实践,我们 async stages 代替。 这或多或少是你的代码在不使用 Await.result:
的情况下的样子def process(row: RequestRecord): Future[Int] = {
db.run(SomeQuery2.result) flatMap {
case retId if retId.isEmpty =>
// what is this? is it a sync call? if it's a rest call it should return a future
val metaData = RestCall()
db.run(SomeQuery3.result).map(_ => metaData.id)
case _ => db.run(SomeQuery4.result)
}
}
Source.fromPublisher(db.stream(SomeQuery.result))
// choose your own parallelism
.mapAsync(2)(dataEnrichmentAPI.process)
.runForeach(id => println("Processed row : "+ id))
通过这种方式,您将明确地、惯用地处理背压和并行性。
尝试从不在生产代码中调用Await.result并且只compose futures using map, flatMap and for comprehensions