Slick 运行 方法返回的 Future 在插入行之前成功完成

Future returned by Slick run method completes successfully before rows have been inserted

我正在使用 Slick 和 PostgreSQL 进行存储的 Scala 应用程序。我有一个方法 foo 从 CSV 文件中读取数据并将大约 10,000 行插入数据库。在行插入操作的 Future 完成后,另一个方法 bar 被调用,它从数据库中检索这些行并对它们执行一些操作。这就是问题所在:实际上没有从数据库中检索到任何行,因为在 Future 完成时还没有插入任何行。

根据我在寻找答案和在官方文档中收集到的信息,Future 不应在成功执行插入语句之前完成。如果我将以下代码 Thread.sleep(30000) 添加到 bar,允许先执行插入语句,这些方法将提供预期的结果。现在,出于显而易见的原因,我宁愿不这样做,所以我正在寻找替代方案。

下图说明了调用初始方法 doStuff 后的程序流程:

doStuff 调用 foo 加载数据并将其存储在数据库中,然后返回 Future。然后在 doStuff 中映射这个 Future 并调用 barbar bar 从数据库中检索行并处理它们。但是,由于在调用 bar 的位置没有插入任何行,因此不会处理任何数据。

doStuff方法:

def doStuff(csvFile: File): Future[Unit] = {
  fooService.foo(csvFile)
    .map(_ => {
      csvFile.delete()
      barService.bar()
    })
}

foo方法:

def foo(file: File) Future[Unit] = {
  val reader = CSVReader.open(file)
  fooStorage.truncateFooData().map(_ => {
    val foos = for (line <- reader.iterator if
    line.head != "bad1" &&
      line.head !="bad2")
      yield parseFooData(line)
    fooStorage.saveFooDataBulk(foos.toSeq)
  })
}

我如何使用 Slick 插入行:

override def saveFooDataBulk(fooSeq: Seq[Foo]): Future[Seq[Foo]] =
  db.run(DBIO.seq(fooQuery ++= fooSeq)).map(_ => fooSeq)

我期待 bar 在所有行都被插入数据库后被调用,而不是更早,但是,目前 Slick 的 Future 完成得太快了。如果相关:当请求发送到 Akka Http 端点时,将调用 doStuff 方法。应用程序和数据库 运行 在两个不同的 docker 容器中。我做错了什么?

我也无法理解我一年半前选择的用户名现在是多么合适。

map 替换为 def foo 中的 flatMap。否则,它将开始一个 Future[Seq[Foo]],然后立即 return 一个 (),然后被你的 doStuff 丢弃。像这样:

fooStorage
  .truncateFooData()
  .flatMap(_ => {
    /* stuff... */
    fooStorage.saveFooDataBulk(foo.toSeq)
  })
  .map(_ => ())

我没有测试它,但无论如何,在另一个 Future.map 中间开始一些 Future,然后立即 returning 一个 ()感觉不太对。