Slick 中如何使用反应流来插入数据
How are reactive streams used in Slick for inserting data
在 Slick's documentation 中,使用 Reactive Streams 的示例仅作为 DatabasePublisher 的一种方式来读取数据。但是,当您想根据插入率将数据库用作 Sink 和 backpressure 时会发生什么?
我寻找了等效的 DatabaseSubscriber 但它不存在。所以问题是,如果我有来源,请说:
val source = Source(0 to 100)
如何使用 Slick 创建一个 Sink,将这些值写入具有架构的 table:
create table NumberTable (value INT)
连续插入
最简单的方法是 inserts within a Sink.foreach
。
假设您已经使用了 schema code generation 并进一步假设您的 table 被命名为 "NumberTable"
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
我们可以编写一个函数来执行插入
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
并且该函数可以放在 Sink 中
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
批量插入
您可以通过一次批处理 N 个插入来进一步扩展 Sink 方法:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
这个批处理的 Sink 可以由 Flow
进行批处理分组:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)
尽管您可以使用 Sink.foreach
来实现此目的(如 Ramon 所述),但使用 [=14= 更安全且可能更快(通过 运行 并行插入) ] Flow
。使用 Sink.foreach
时您将面临的问题是它没有 return 值。通过 slicks db.run
方法 returns a Future
插入数据库,然后从 returned Future[Done]
中逃脱 returned Future[Done]
一旦 Sink.foreach
完成。
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
def value = column[Int]("value")
def * = value
}
val numbers = TableQuery[Numbers]
val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)
val streamFuture: Future[Done] = Source(0 to 100)
.runWith(Sink.foreach[Int] { (i: Int) =>
db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
})
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")
//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done
另一方面,def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])
Flow
允许您 运行 通过并行参数并行插入,并接受来自上游输出值的函数到某些函数的未来类型。这与我们的 i => db.run(numbers += i)
函数相匹配。这个 Flow
的伟大之处在于它随后将这些 Futures
的结果提供给下游。
val streamFuture2: Future[Done] = Source(0 to 100)
.mapAsync(1) { (i: Int) =>
db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
}
.runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")
//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done <-- stream Future[Done] returned after inserts finished
为了证明这一点,您甚至可以 return 来自流的真实结果而不是 Future[Done]
(完成代表单位)。此流还将添加更高的并行度值和批处理以获得额外的性能。 *
val streamFuture3: Future[Int] = Source(0 to 100)
.via(Flow[Int].grouped(10)) // Batch in size 10
.mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
.runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")
// sample 3 output:
// stream 3 done, inserted 101 rows
- 注意:对于这么小的数据集,您可能不会看到更好的性能,但是当我处理 1.7M 插入时,我能够在我的机器上获得最佳性能,批量大小为 1000,并且并行度值为 8,在本地使用 postgresql。这大约是 运行ning 并行的两倍。与往常一样,在处理性能时,您的结果可能会有所不同,您应该自己衡量。
我发现 Alpakka 的文档非常出色,并且它的 DSL 非常容易使用反应流。
这是 Slick 的文档:https://doc.akka.io/docs/alpakka/current/slick.html
插入示例:
Source(0 to 100)
.runWith(
// add an optional first argument to specify the parallelism factor (Int)
Slick.sink(value => sqlu"INSERT INTO NumberTable VALUES(${value})")
)
在 Slick's documentation 中,使用 Reactive Streams 的示例仅作为 DatabasePublisher 的一种方式来读取数据。但是,当您想根据插入率将数据库用作 Sink 和 backpressure 时会发生什么?
我寻找了等效的 DatabaseSubscriber 但它不存在。所以问题是,如果我有来源,请说:
val source = Source(0 to 100)
如何使用 Slick 创建一个 Sink,将这些值写入具有架构的 table:
create table NumberTable (value INT)
连续插入
最简单的方法是 inserts within a Sink.foreach
。
假设您已经使用了 schema code generation 并进一步假设您的 table 被命名为 "NumberTable"
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
我们可以编写一个函数来执行插入
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
并且该函数可以放在 Sink 中
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
批量插入
您可以通过一次批处理 N 个插入来进一步扩展 Sink 方法:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
这个批处理的 Sink 可以由 Flow
进行批处理分组:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)
尽管您可以使用 Sink.foreach
来实现此目的(如 Ramon 所述),但使用 [=14= 更安全且可能更快(通过 运行 并行插入) ] Flow
。使用 Sink.foreach
时您将面临的问题是它没有 return 值。通过 slicks db.run
方法 returns a Future
插入数据库,然后从 returned Future[Done]
中逃脱 returned Future[Done]
一旦 Sink.foreach
完成。
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
def value = column[Int]("value")
def * = value
}
val numbers = TableQuery[Numbers]
val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)
val streamFuture: Future[Done] = Source(0 to 100)
.runWith(Sink.foreach[Int] { (i: Int) =>
db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
})
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")
//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done
另一方面,def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])
Flow
允许您 运行 通过并行参数并行插入,并接受来自上游输出值的函数到某些函数的未来类型。这与我们的 i => db.run(numbers += i)
函数相匹配。这个 Flow
的伟大之处在于它随后将这些 Futures
的结果提供给下游。
val streamFuture2: Future[Done] = Source(0 to 100)
.mapAsync(1) { (i: Int) =>
db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
}
.runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")
//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done <-- stream Future[Done] returned after inserts finished
为了证明这一点,您甚至可以 return 来自流的真实结果而不是 Future[Done]
(完成代表单位)。此流还将添加更高的并行度值和批处理以获得额外的性能。 *
val streamFuture3: Future[Int] = Source(0 to 100)
.via(Flow[Int].grouped(10)) // Batch in size 10
.mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
.runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")
// sample 3 output:
// stream 3 done, inserted 101 rows
- 注意:对于这么小的数据集,您可能不会看到更好的性能,但是当我处理 1.7M 插入时,我能够在我的机器上获得最佳性能,批量大小为 1000,并且并行度值为 8,在本地使用 postgresql。这大约是 运行ning 并行的两倍。与往常一样,在处理性能时,您的结果可能会有所不同,您应该自己衡量。
我发现 Alpakka 的文档非常出色,并且它的 DSL 非常容易使用反应流。
这是 Slick 的文档:https://doc.akka.io/docs/alpakka/current/slick.html
插入示例:
Source(0 to 100)
.runWith(
// add an optional first argument to specify the parallelism factor (Int)
Slick.sink(value => sqlu"INSERT INTO NumberTable VALUES(${value})")
)