使用 Akka Stream 从数据库流式传输记录
Stream records from DataBase using Akka Stream
我有一个使用 Akka 的系统,该系统当前通过消息队列处理传入的流数据。当记录到达然后被处理时,mq 被确认并且记录被传递以在系统内进一步处理。
现在我想添加对使用数据库作为输入的支持。
输入源能够处理 DB 的方法是什么(应该以接收器可以处理的速度流式传输 > 100M 记录 - 所以我假设 reactive/akka-streams?)?
Slick 库
Slick streaming 通常是这样做的。
稍微扩展精巧的文档以包含 akka 流:
//SELECT Name from Coffees
val q = for (c <- coffees) yield c.name
val action = q.result
type Name = String
val databasePublisher : DatabasePublisher[Name] = db stream action
import akka.stream.scaladsl.Source
val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher
现在 akkaSourceFromSlick
就像任何其他 akka 流 Source
。
"Old School" 结果集
也可以使用普通的 ResultSet
,而不是 slick,作为 akka 流的 "engine"。我们将利用流 Source
可以从 Iterator
.
实例化这一事实
首先使用标准 jdbc 技术创建结果集:
import java.sql._
val resultSetGenerator : () => Try[ResultSet] = Try {
val statement : Statement = ???
statement executeQuery "SELECT Name from Coffees"
}
当然,所有的 ResultSet 实例都必须将光标移动到第一行之前:
val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] =
(resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)
一旦我们开始遍历行,我们就必须从正确的列中提取值:
val getNameFromResultSet : ResultSet => Name = _ getString "Name"
现在我们可以实现 Iterator
接口来从 ResultSet 创建 Iterator[Name]
:
val convertResultSetToNameIterator : ResultSet => Iterator[Name] =
(resultSet) => new Iterator[Try[Name]] {
override def hasNext : Boolean = resultSet.next
override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
} flatMap (_.toOption)
最后,将所有部分粘合在一起以创建我们需要传递给 Source.fromIterator
的函数:
val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] =
(_ : () => Try[ResultSet])
.andThen(_ flatMap adjustResultSetBeforeFirst)
.andThen(_ map convertResultSetToNameIterator)
.andThen(_ getOrElse Iterator.empty)
此迭代器现在可以提供源:
val akkaSourceFromResultSet : Source[Name, _] =
Source fromIterator resultSetGenToNameIterator(resultSetGenerator)
此实现一直反应到数据库。由于 ResultSet 一次预取有限数量的行,因此数据只会在流 Sink
信号需求时通过数据库从硬盘驱动器中取出。
我发现 Alpakka 文档非常出色,并且比 Java Publisher 界面更容易使用反应流。
Alpakka 项目是一项开源计划,旨在为 Java 和 Scala 实施流感知、响应式集成管道。它建立在 Akka Streams 之上,从头开始设计以理解本地流式传输,并为反应式和面向流的编程提供 DSL,并内置了对背压的支持
带有 Slick 的 Alpakka 文档:https://doc.akka.io/docs/alpakka/current/slick.html
阿尔帕卡 Github: https://github.com/akka/alpakka
我有一个使用 Akka 的系统,该系统当前通过消息队列处理传入的流数据。当记录到达然后被处理时,mq 被确认并且记录被传递以在系统内进一步处理。
现在我想添加对使用数据库作为输入的支持。
输入源能够处理 DB 的方法是什么(应该以接收器可以处理的速度流式传输 > 100M 记录 - 所以我假设 reactive/akka-streams?)?
Slick 库
Slick streaming 通常是这样做的。
稍微扩展精巧的文档以包含 akka 流:
//SELECT Name from Coffees
val q = for (c <- coffees) yield c.name
val action = q.result
type Name = String
val databasePublisher : DatabasePublisher[Name] = db stream action
import akka.stream.scaladsl.Source
val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher
现在 akkaSourceFromSlick
就像任何其他 akka 流 Source
。
"Old School" 结果集
也可以使用普通的 ResultSet
,而不是 slick,作为 akka 流的 "engine"。我们将利用流 Source
可以从 Iterator
.
首先使用标准 jdbc 技术创建结果集:
import java.sql._
val resultSetGenerator : () => Try[ResultSet] = Try {
val statement : Statement = ???
statement executeQuery "SELECT Name from Coffees"
}
当然,所有的 ResultSet 实例都必须将光标移动到第一行之前:
val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] =
(resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)
一旦我们开始遍历行,我们就必须从正确的列中提取值:
val getNameFromResultSet : ResultSet => Name = _ getString "Name"
现在我们可以实现 Iterator
接口来从 ResultSet 创建 Iterator[Name]
:
val convertResultSetToNameIterator : ResultSet => Iterator[Name] =
(resultSet) => new Iterator[Try[Name]] {
override def hasNext : Boolean = resultSet.next
override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
} flatMap (_.toOption)
最后,将所有部分粘合在一起以创建我们需要传递给 Source.fromIterator
的函数:
val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] =
(_ : () => Try[ResultSet])
.andThen(_ flatMap adjustResultSetBeforeFirst)
.andThen(_ map convertResultSetToNameIterator)
.andThen(_ getOrElse Iterator.empty)
此迭代器现在可以提供源:
val akkaSourceFromResultSet : Source[Name, _] =
Source fromIterator resultSetGenToNameIterator(resultSetGenerator)
此实现一直反应到数据库。由于 ResultSet 一次预取有限数量的行,因此数据只会在流 Sink
信号需求时通过数据库从硬盘驱动器中取出。
我发现 Alpakka 文档非常出色,并且比 Java Publisher 界面更容易使用反应流。
Alpakka 项目是一项开源计划,旨在为 Java 和 Scala 实施流感知、响应式集成管道。它建立在 Akka Streams 之上,从头开始设计以理解本地流式传输,并为反应式和面向流的编程提供 DSL,并内置了对背压的支持
带有 Slick 的 Alpakka 文档:https://doc.akka.io/docs/alpakka/current/slick.html
阿尔帕卡 Github: https://github.com/akka/alpakka