在 ConnectionIO 事务中并行化操作
Parallelizing operations within a ConnectionIO transaction
所以我有一个程序,我从数据库中获取文件路径列表,删除文件系统上的那些文件,最后从数据库中删除文件路径。我将所有操作放在一个事务中,以确保路径将从数据库中删除,前提是文件系统中的所有文件都被删除。
像这样
val result = for {
deletePath <- (fr""" select path from files""").query[String].stream //Stream[doobie.ConnectionIO,String]
_ <- Stream.eval(AsyncConnectionIO.liftIO(File(deletePath).delete()) //Stream[doobie.ConnectionIO,Unit]
_ <- Stream.eval(sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys)
}
result.compile.drain.transact(transactor)
不幸的是,文件系统是分布式的,这意味着单个操作很慢,但它允许同时进行多个操作。
所以我的问题是,如何在此处并行执行文件系统删除操作?
是的,你可以。只需使用适当的组合器而不是 for
语法。
val result =
(fr""" select path from files""")
.query[String]
.stream
.parEvalMapUnordered(maxConcurrent = 64) { deletePath =>
AsyncConnectionIO.liftIO(File(deletePath).delete()) >>
sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys
}
result.compile.drain.transact(transactor)
记得将 maxConcurrent
参数更改为对您的用例有意义的参数。
(我无法测试代码,所以可能有一些错别字)
所以我有一个程序,我从数据库中获取文件路径列表,删除文件系统上的那些文件,最后从数据库中删除文件路径。我将所有操作放在一个事务中,以确保路径将从数据库中删除,前提是文件系统中的所有文件都被删除。
像这样
val result = for {
deletePath <- (fr""" select path from files""").query[String].stream //Stream[doobie.ConnectionIO,String]
_ <- Stream.eval(AsyncConnectionIO.liftIO(File(deletePath).delete()) //Stream[doobie.ConnectionIO,Unit]
_ <- Stream.eval(sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys)
}
result.compile.drain.transact(transactor)
不幸的是,文件系统是分布式的,这意味着单个操作很慢,但它允许同时进行多个操作。
所以我的问题是,如何在此处并行执行文件系统删除操作?
是的,你可以。只需使用适当的组合器而不是 for
语法。
val result =
(fr""" select path from files""")
.query[String]
.stream
.parEvalMapUnordered(maxConcurrent = 64) { deletePath =>
AsyncConnectionIO.liftIO(File(deletePath).delete()) >>
sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys
}
result.compile.drain.transact(transactor)
记得将 maxConcurrent
参数更改为对您的用例有意义的参数。
(我无法测试代码,所以可能有一些错别字)