Slick 运行 方法返回的 Future 在插入行之前成功完成
Future returned by Slick run method completes successfully before rows have been inserted
我正在使用 Slick 和 PostgreSQL 进行存储的 Scala 应用程序。我有一个方法 foo
从 CSV 文件中读取数据并将大约 10,000 行插入数据库。在行插入操作的 Future 完成后,另一个方法 bar
被调用,它从数据库中检索这些行并对它们执行一些操作。这就是问题所在:实际上没有从数据库中检索到任何行,因为在 Future 完成时还没有插入任何行。
根据我在寻找答案和在官方文档中收集到的信息,Future 不应在成功执行插入语句之前完成。如果我将以下代码 Thread.sleep(30000)
添加到 bar
,允许先执行插入语句,这些方法将提供预期的结果。现在,出于显而易见的原因,我宁愿不这样做,所以我正在寻找替代方案。
下图说明了调用初始方法 doStuff
后的程序流程:
doStuff
调用 foo
加载数据并将其存储在数据库中,然后返回 Future。然后在 doStuff
中映射这个 Future 并调用 bar
。 bar
bar 从数据库中检索行并处理它们。但是,由于在调用 bar
的位置没有插入任何行,因此不会处理任何数据。
doStuff
方法:
def doStuff(csvFile: File): Future[Unit] = {
fooService.foo(csvFile)
.map(_ => {
csvFile.delete()
barService.bar()
})
}
foo
方法:
def foo(file: File) Future[Unit] = {
val reader = CSVReader.open(file)
fooStorage.truncateFooData().map(_ => {
val foos = for (line <- reader.iterator if
line.head != "bad1" &&
line.head !="bad2")
yield parseFooData(line)
fooStorage.saveFooDataBulk(foos.toSeq)
})
}
我如何使用 Slick 插入行:
override def saveFooDataBulk(fooSeq: Seq[Foo]): Future[Seq[Foo]] =
db.run(DBIO.seq(fooQuery ++= fooSeq)).map(_ => fooSeq)
我期待 bar
在所有行都被插入数据库后被调用,而不是更早,但是,目前 Slick 的 Future 完成得太快了。如果相关:当请求发送到 Akka Http 端点时,将调用 doStuff
方法。应用程序和数据库 运行 在两个不同的 docker 容器中。我做错了什么?
我也无法理解我一年半前选择的用户名现在是多么合适。
将 map
替换为 def foo
中的 flatMap
。否则,它将开始一个 Future[Seq[Foo]]
,然后立即 return 一个 ()
,然后被你的 doStuff
丢弃。像这样:
fooStorage
.truncateFooData()
.flatMap(_ => {
/* stuff... */
fooStorage.saveFooDataBulk(foo.toSeq)
})
.map(_ => ())
我没有测试它,但无论如何,在另一个 Future.map
中间开始一些 Future
,然后立即 returning 一个 ()
感觉不太对。
我正在使用 Slick 和 PostgreSQL 进行存储的 Scala 应用程序。我有一个方法 foo
从 CSV 文件中读取数据并将大约 10,000 行插入数据库。在行插入操作的 Future 完成后,另一个方法 bar
被调用,它从数据库中检索这些行并对它们执行一些操作。这就是问题所在:实际上没有从数据库中检索到任何行,因为在 Future 完成时还没有插入任何行。
根据我在寻找答案和在官方文档中收集到的信息,Future 不应在成功执行插入语句之前完成。如果我将以下代码 Thread.sleep(30000)
添加到 bar
,允许先执行插入语句,这些方法将提供预期的结果。现在,出于显而易见的原因,我宁愿不这样做,所以我正在寻找替代方案。
下图说明了调用初始方法 doStuff
后的程序流程:
doStuff
调用 foo
加载数据并将其存储在数据库中,然后返回 Future。然后在 doStuff
中映射这个 Future 并调用 bar
。 bar
bar 从数据库中检索行并处理它们。但是,由于在调用 bar
的位置没有插入任何行,因此不会处理任何数据。
doStuff
方法:
def doStuff(csvFile: File): Future[Unit] = {
fooService.foo(csvFile)
.map(_ => {
csvFile.delete()
barService.bar()
})
}
foo
方法:
def foo(file: File) Future[Unit] = {
val reader = CSVReader.open(file)
fooStorage.truncateFooData().map(_ => {
val foos = for (line <- reader.iterator if
line.head != "bad1" &&
line.head !="bad2")
yield parseFooData(line)
fooStorage.saveFooDataBulk(foos.toSeq)
})
}
我如何使用 Slick 插入行:
override def saveFooDataBulk(fooSeq: Seq[Foo]): Future[Seq[Foo]] =
db.run(DBIO.seq(fooQuery ++= fooSeq)).map(_ => fooSeq)
我期待 bar
在所有行都被插入数据库后被调用,而不是更早,但是,目前 Slick 的 Future 完成得太快了。如果相关:当请求发送到 Akka Http 端点时,将调用 doStuff
方法。应用程序和数据库 运行 在两个不同的 docker 容器中。我做错了什么?
我也无法理解我一年半前选择的用户名现在是多么合适。
将 map
替换为 def foo
中的 flatMap
。否则,它将开始一个 Future[Seq[Foo]]
,然后立即 return 一个 ()
,然后被你的 doStuff
丢弃。像这样:
fooStorage
.truncateFooData()
.flatMap(_ => {
/* stuff... */
fooStorage.saveFooDataBulk(foo.toSeq)
})
.map(_ => ())
我没有测试它,但无论如何,在另一个 Future.map
中间开始一些 Future
,然后立即 returning 一个 ()
感觉不太对。