使用 Akka Streams 2.4.2 和 Slick 3.0 从 postgres 读取
Reading from postgres using Akka Streams 2.4.2 and Slick 3.0
试用新创建的 Akka Streams。它似乎工作正常,除了一件小事 - 没有输出。
我有以下 table 定义:
case class my_stream(id: Int, value: String)
class Streams(tag: Tag) extends Table[my_stream](tag, "my_stream") {
def id = column[Int]("id")
def value = column[String]("value")
def * = (id, value) <> (my_stream.tupled, my_stream.unapply)
}
我正在尝试将 table 的内容输出到标准输出,如下所示:
def main(args: Array[String]) : Unit = {
implicit val system = ActorSystem("Subscriber")
implicit val materializer = ActorMaterializer()
val strm = TableQuery[Streams]
val db = Database.forConfig("pg-postgres")
try{
var src = Source.fromPublisher(db.stream(strm.result))
src.runForeach(r => println(s"${r.id},${r.value}"))(materializer)
} finally {
system.shutdown
db.close
}
}
我已通过配置调试日志记录验证查询 运行。然而,我得到的只是:
08:59:24.099 [main] INFO com.zaxxer.hikari.HikariDataSource - pg-postgres - is starting.
08:59:24.428 [main] INFO com.zaxxer.hikari.pool.HikariPool - pg-postgres - is closing down.
最终使用了@ViktorKlang 的回答,只是用 Await.result
包裹了 运行。我还找到了一个替代答案 in the docs,它演示了如何使用反应流发布者和订阅者接口:
stream
方法returns一个DatabasePublisher[T]
和Source.fromPublisher
returns一个Source[T, NotUsed]
。这意味着您必须附加订阅者而不是使用 runForEach
- 根据 release notes NotUsed
是 Unit
的替代品。这意味着什么都不会传递给 Sink
.
由于 Slick 实现了反应流接口而不是 Akka Stream 接口,您需要使用 fromPublisher
和 fromSubscriber
集成点。这意味着您需要实现 org.reactivestreams.Subscriber[T]
接口。
这是一个快速而肮脏的 Subscriber[T]
实现,它只调用 println
:
class MyStreamWriter extends org.reactivestreams.Subscriber[my_stream] {
private var sub : Option[Subscription] = None;
override def onNext(t: my_stream): Unit = {
println(t.value)
if(sub.nonEmpty) sub.head.request(1)
}
override def onError(throwable: Throwable): Unit = {
println(throwable.getMessage)
}
override def onSubscribe(subscription: Subscription): Unit = {
sub = Some(subscription)
sub.head.request(1)
}
override def onComplete(): Unit = {
println("ALL DONE!")
}
}
您需要确保在 onSubscribe
中调用 Subscription.request(Long)
方法,然后在 onNext
中请求数据,否则将不会发送任何数据,否则您将无法获得完整数据结果集。
下面是你如何使用它:
def main(args: Array[String]) : Unit = {
implicit val system = ActorSystem("Subscriber")
implicit val materializer = ActorMaterializer()
val strm = TableQuery[Streams]
val db = Database.forConfig("pg-postgres")
try{
val src = Source.fromPublisher(db.stream(strm.result))
val flow = src.to(Sink.fromSubscriber(new MyStreamWriter()))
flow.run()
} finally {
system.shutdown
db.close
}
}
我仍在努力解决这个问题,因此欢迎任何反馈。谢谢!
原因是 Akka Streams 是异步的,runForeach
returns 流完成后将完成的 Future,但未处理 Future,因此 system.shutdown
和 db.close
立即执行,而不是在流完成后执行。
以防万一它可以帮助任何搜索同样问题的人,但在 MySQL 中,请考虑您应该启用驱动程序流支持 "manually":
def enableStream(statement: java.sql.Statement): Unit = {
statement match {
case s: com.mysql.jdbc.StatementImpl => s.enableStreamingResults()
case _ =>
}
}
val publisher = sourceDb.stream(query.result.withStatementParameters(statementInit = enableStream))
来源:http://www.slideshare.net/kazukinegoro5/akka-streams-100-scalamatsuri
试用新创建的 Akka Streams。它似乎工作正常,除了一件小事 - 没有输出。
我有以下 table 定义:
case class my_stream(id: Int, value: String)
class Streams(tag: Tag) extends Table[my_stream](tag, "my_stream") {
def id = column[Int]("id")
def value = column[String]("value")
def * = (id, value) <> (my_stream.tupled, my_stream.unapply)
}
我正在尝试将 table 的内容输出到标准输出,如下所示:
def main(args: Array[String]) : Unit = {
implicit val system = ActorSystem("Subscriber")
implicit val materializer = ActorMaterializer()
val strm = TableQuery[Streams]
val db = Database.forConfig("pg-postgres")
try{
var src = Source.fromPublisher(db.stream(strm.result))
src.runForeach(r => println(s"${r.id},${r.value}"))(materializer)
} finally {
system.shutdown
db.close
}
}
我已通过配置调试日志记录验证查询 运行。然而,我得到的只是:
08:59:24.099 [main] INFO com.zaxxer.hikari.HikariDataSource - pg-postgres - is starting.
08:59:24.428 [main] INFO com.zaxxer.hikari.pool.HikariPool - pg-postgres - is closing down.
最终使用了@ViktorKlang 的回答,只是用 Await.result
包裹了 运行。我还找到了一个替代答案 in the docs,它演示了如何使用反应流发布者和订阅者接口:
stream
方法returns一个DatabasePublisher[T]
和Source.fromPublisher
returns一个Source[T, NotUsed]
。这意味着您必须附加订阅者而不是使用 runForEach
- 根据 release notes NotUsed
是 Unit
的替代品。这意味着什么都不会传递给 Sink
.
由于 Slick 实现了反应流接口而不是 Akka Stream 接口,您需要使用 fromPublisher
和 fromSubscriber
集成点。这意味着您需要实现 org.reactivestreams.Subscriber[T]
接口。
这是一个快速而肮脏的 Subscriber[T]
实现,它只调用 println
:
class MyStreamWriter extends org.reactivestreams.Subscriber[my_stream] {
private var sub : Option[Subscription] = None;
override def onNext(t: my_stream): Unit = {
println(t.value)
if(sub.nonEmpty) sub.head.request(1)
}
override def onError(throwable: Throwable): Unit = {
println(throwable.getMessage)
}
override def onSubscribe(subscription: Subscription): Unit = {
sub = Some(subscription)
sub.head.request(1)
}
override def onComplete(): Unit = {
println("ALL DONE!")
}
}
您需要确保在 onSubscribe
中调用 Subscription.request(Long)
方法,然后在 onNext
中请求数据,否则将不会发送任何数据,否则您将无法获得完整数据结果集。
下面是你如何使用它:
def main(args: Array[String]) : Unit = {
implicit val system = ActorSystem("Subscriber")
implicit val materializer = ActorMaterializer()
val strm = TableQuery[Streams]
val db = Database.forConfig("pg-postgres")
try{
val src = Source.fromPublisher(db.stream(strm.result))
val flow = src.to(Sink.fromSubscriber(new MyStreamWriter()))
flow.run()
} finally {
system.shutdown
db.close
}
}
我仍在努力解决这个问题,因此欢迎任何反馈。谢谢!
原因是 Akka Streams 是异步的,runForeach
returns 流完成后将完成的 Future,但未处理 Future,因此 system.shutdown
和 db.close
立即执行,而不是在流完成后执行。
以防万一它可以帮助任何搜索同样问题的人,但在 MySQL 中,请考虑您应该启用驱动程序流支持 "manually":
def enableStream(statement: java.sql.Statement): Unit = {
statement match {
case s: com.mysql.jdbc.StatementImpl => s.enableStreamingResults()
case _ =>
}
}
val publisher = sourceDb.stream(query.result.withStatementParameters(statementInit = enableStream))
来源:http://www.slideshare.net/kazukinegoro5/akka-streams-100-scalamatsuri