Scala:文件数据的顺序处理
Scala: Sequential processing of file data
我有一个 csv 文件,我从中读取数据并填充我的数据库。我正在使用 scala 来做到这一点。我不想以并行方式触发数据库插入,而是想以顺序方式执行插入(即一个接一个)。我不愿意在 for 循环中使用 Await。除了使用 await 之外还有其他方法吗?
P.S:我已经将 1000 个条目从 csv 读取到一个列表,并在列表上循环以创建数据库插入
假设你的数据库有某种save(entity: T): Future[_]
方法,你可以用 flatMap 折叠你的期货(或为了理解):
def saveAll(entities: List[T]): Future[Unit]
entities.foldLeft(Future.successful(())){
case (f, entity) => for {
_ <- f
_ <- save(entity)
} yield ()
}
}
另一个选项是递归函数。不如 foldLeft
简洁,但对某些人来说更具可读性。还有一个选项供您考虑(假设 save(entity: T): Future[R]
:
def saveAll(entities: List[T]): Future[List[R]] = {
entities.headOption match {
case Some(entity) =>
for {
head <- save(entity)
tail <- saveAll(entities.tail)
} yield {
head :: tail
}
case None =>
Future.successful(Nil)
}
}
还有另一种选择,如果您的 save
方法允许您提供自己的 ExecutionContext
,即 save(entity: T)(implicit ec: ExecutionContext): Future[R]
,则只需同时触发 Future
,但使用单个线程执行上下文:
def saveAll(entities: List[T]): Future[List[R]] = {
implicit ec = ExecutionContext.fromExecutionService(java.util.concurrent.Executors.newSingleThreadExecutor)
Future.sequence(entities.map(save))
}
我有一个 csv 文件,我从中读取数据并填充我的数据库。我正在使用 scala 来做到这一点。我不想以并行方式触发数据库插入,而是想以顺序方式执行插入(即一个接一个)。我不愿意在 for 循环中使用 Await。除了使用 await 之外还有其他方法吗?
P.S:我已经将 1000 个条目从 csv 读取到一个列表,并在列表上循环以创建数据库插入
假设你的数据库有某种save(entity: T): Future[_]
方法,你可以用 flatMap 折叠你的期货(或为了理解):
def saveAll(entities: List[T]): Future[Unit]
entities.foldLeft(Future.successful(())){
case (f, entity) => for {
_ <- f
_ <- save(entity)
} yield ()
}
}
另一个选项是递归函数。不如 foldLeft
简洁,但对某些人来说更具可读性。还有一个选项供您考虑(假设 save(entity: T): Future[R]
:
def saveAll(entities: List[T]): Future[List[R]] = {
entities.headOption match {
case Some(entity) =>
for {
head <- save(entity)
tail <- saveAll(entities.tail)
} yield {
head :: tail
}
case None =>
Future.successful(Nil)
}
}
还有另一种选择,如果您的 save
方法允许您提供自己的 ExecutionContext
,即 save(entity: T)(implicit ec: ExecutionContext): Future[R]
,则只需同时触发 Future
,但使用单个线程执行上下文:
def saveAll(entities: List[T]): Future[List[R]] = {
implicit ec = ExecutionContext.fromExecutionService(java.util.concurrent.Executors.newSingleThreadExecutor)
Future.sequence(entities.map(save))
}