如何使用 slick-streaming 和 akka-streaming 将一些记录从 TableA 复制到 TableB

How to copy some records from TableA to TableB with slick-streaming and akka-streaming

有两个表 TableATableB

我需要将一些记录从 TableA 复制到 TableB。我使用 slick-3.0 并使用以下方式:

import akka.stream._
import akka.stream.scaladsl._
...

//{{ READ DATA FROM TABLE A
val q = TableA.filter(somePredicate).result
val source = Source.fromPublisher {
      db.stream(q.result).mapResult { r => 
        val record: RecordA = someTransformation(r) 
        record
      }
    }.grouped(50) // grouping because I want to write records in batch mode
//}}

//{{ WRITE DATA TO TABLE B
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] =>
      //TODO how to write batch to TableB asynchronously?
      val insertAction = TableB ++= batch  // insert batch to table
      val fInsert: Future[_] = db.run(insertAction)
      Await.result(fInsert, ...)           // #1 this works only with blocking
})
//}}

但我遇到了一个问题 - 如何异步将批处理写入 TableB(请参阅 TODO)。现在上面的代码只适用于阻塞内部未来(见#1 评论)。有异步执行该任务的正确方法吗?

使用 mapAsync 它期望返回一个未来,并在下一阶段公开 "unwrapped" 结果。

source.mapAsync(4){batch: Seq[RecordA] =>
      val insertAction = TableB ++= batch  // insert batch to table
      db.run(insertAction)
}).to(Sink.ignore).run