如何使用 slick-streaming 和 akka-streaming 将一些记录从 TableA 复制到 TableB
How to copy some records from TableA to TableB with slick-streaming and akka-streaming
有两个表 TableA
和 TableB
。
我需要将一些记录从 TableA
复制到 TableB
。我使用 slick-3.0
并使用以下方式:
import akka.stream._
import akka.stream.scaladsl._
...
//{{ READ DATA FROM TABLE A
val q = TableA.filter(somePredicate).result
val source = Source.fromPublisher {
db.stream(q.result).mapResult { r =>
val record: RecordA = someTransformation(r)
record
}
}.grouped(50) // grouping because I want to write records in batch mode
//}}
//{{ WRITE DATA TO TABLE B
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] =>
//TODO how to write batch to TableB asynchronously?
val insertAction = TableB ++= batch // insert batch to table
val fInsert: Future[_] = db.run(insertAction)
Await.result(fInsert, ...) // #1 this works only with blocking
})
//}}
但我遇到了一个问题 - 如何异步将批处理写入 TableB
(请参阅 TODO)。现在上面的代码只适用于阻塞内部未来(见#1 评论)。有异步执行该任务的正确方法吗?
使用 mapAsync
它期望返回一个未来,并在下一阶段公开 "unwrapped" 结果。
source.mapAsync(4){batch: Seq[RecordA] =>
val insertAction = TableB ++= batch // insert batch to table
db.run(insertAction)
}).to(Sink.ignore).run
有两个表 TableA
和 TableB
。
我需要将一些记录从 TableA
复制到 TableB
。我使用 slick-3.0
并使用以下方式:
import akka.stream._
import akka.stream.scaladsl._
...
//{{ READ DATA FROM TABLE A
val q = TableA.filter(somePredicate).result
val source = Source.fromPublisher {
db.stream(q.result).mapResult { r =>
val record: RecordA = someTransformation(r)
record
}
}.grouped(50) // grouping because I want to write records in batch mode
//}}
//{{ WRITE DATA TO TABLE B
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] =>
//TODO how to write batch to TableB asynchronously?
val insertAction = TableB ++= batch // insert batch to table
val fInsert: Future[_] = db.run(insertAction)
Await.result(fInsert, ...) // #1 this works only with blocking
})
//}}
但我遇到了一个问题 - 如何异步将批处理写入 TableB
(请参阅 TODO)。现在上面的代码只适用于阻塞内部未来(见#1 评论)。有异步执行该任务的正确方法吗?
使用 mapAsync
它期望返回一个未来,并在下一阶段公开 "unwrapped" 结果。
source.mapAsync(4){batch: Seq[RecordA] =>
val insertAction = TableB ++= batch // insert batch to table
db.run(insertAction)
}).to(Sink.ignore).run