Scala Slick:永无止境的流
Scala Slick: Never ending stream
使用 Slick,您可以执行以下操作以从 table 生成结果流:
val q = for (e <- events) yield e.name
val p: DatabasePublisher[String] = db.stream(q.result)
p.foreach { s => println(s"Event: $s") }
这将打印 events
table 中的所有事件并在最后一行后终止。
假设当新行被输入 events
table 时,您可以通过某种方式得到通知,是否可以编写一个流,在事件被插入时连续输出事件? DB table.
的一种 tail -f
我认为 Slick 本身不会支持这个,但我认为应该可以使用 Akka 流来提供帮助。因此,如果您可以从 Slick Source 获取一些东西直到它为空,然后等待事件指示 table 中有更多数据,然后流式传输新数据。可能通过使用 ActorPublisher
来绑定此逻辑?
想知道是否有人在这方面有任何经验或建议?
关于 ActorPublisher
你是正确的 :) 这是一个使用 PostgreSQL 的简单例子,async DB driver and LISTEN/NOTIFY mechanism:
演员:
class PostgresListener extends ActorPublisher[String] {
override def receive = {
case _ ⇒
val configuration = URLParser.parse(s"jdbc://postgresql://$host:$port/$db?user=$user&password=$password")
val connection = new PostgreSQLConnection(configuration)
Await.result(connection.connect, 5.seconds)
connection.sendQuery(s"LISTEN $channel")
connection.registerNotifyListener { message ⇒ onNext(message.payload) }
}
}
服务:
def stream: Source[ServerSentEvent, Unit] = {
val dataPublisherRef = Props[PostgresListener]
val dataPublisher = ActorPublisher[String](dataPublisherRef)
dataPublisherRef ! "go"
Source(dataPublisher)
.map(ServerSentEvent(_))
.via(WithHeartbeats(10.second))
}
build.sbt
在 libraryDependencies
:
"com.github.mauricio" %% "postgresql-async" % "0.2.18"
Postgres 触发器应该调用 select pg_notify('foo', 'payload')
据我所知,Slick不支持LISTEN
。
ActorPublisher
has been deprecated since Akka 2.5.0. Here is an alternative that uses the postgresql-async library and creates a SourceQueue
演员内部:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser
import scala.concurrent.duration._
import scala.concurrent.Await
class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
private implicit val ec = context.system.dispatcher
val queue =
Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
.to(Sink.foreach(println))
.run()
val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
val connection = new PostgreSQLConnection(configuration)
Await.result(connection.connect, 5 seconds)
connection.sendQuery("LISTEN my_channel")
connection.registerNotifyListener { message =>
val msg = message.payload
log.debug("Sending the payload: {}", msg)
self ! msg
}
def receive = {
case payload: String =>
queue.offer(payload).pipeTo(self)
case QueueOfferResult.Dropped =>
log.warning("Dropped a message.")
case QueueOfferResult.Enqueued =>
log.debug("Enqueued a message.")
case QueueOfferResult.Failure(t) =>
log.error("Stream failed: {}", t.getMessage)
case QueueOfferResult.QueueClosed =>
log.debug("Stream closed.")
}
}
上面的代码只是在 PostgreSQL 发生时打印通知;您可以将 Sink.foreach(println)
替换为另一个 Sink
。给运行吧:
import akka.actor._
import akka.stream.ActorMaterializer
object Example extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
system.actorOf(Props(classOf[DbActor], materializer))
}
使用 Slick,您可以执行以下操作以从 table 生成结果流:
val q = for (e <- events) yield e.name
val p: DatabasePublisher[String] = db.stream(q.result)
p.foreach { s => println(s"Event: $s") }
这将打印 events
table 中的所有事件并在最后一行后终止。
假设当新行被输入 events
table 时,您可以通过某种方式得到通知,是否可以编写一个流,在事件被插入时连续输出事件? DB table.
tail -f
我认为 Slick 本身不会支持这个,但我认为应该可以使用 Akka 流来提供帮助。因此,如果您可以从 Slick Source 获取一些东西直到它为空,然后等待事件指示 table 中有更多数据,然后流式传输新数据。可能通过使用 ActorPublisher
来绑定此逻辑?
想知道是否有人在这方面有任何经验或建议?
关于 ActorPublisher
你是正确的 :) 这是一个使用 PostgreSQL 的简单例子,async DB driver and LISTEN/NOTIFY mechanism:
演员:
class PostgresListener extends ActorPublisher[String] {
override def receive = {
case _ ⇒
val configuration = URLParser.parse(s"jdbc://postgresql://$host:$port/$db?user=$user&password=$password")
val connection = new PostgreSQLConnection(configuration)
Await.result(connection.connect, 5.seconds)
connection.sendQuery(s"LISTEN $channel")
connection.registerNotifyListener { message ⇒ onNext(message.payload) }
}
}
服务:
def stream: Source[ServerSentEvent, Unit] = {
val dataPublisherRef = Props[PostgresListener]
val dataPublisher = ActorPublisher[String](dataPublisherRef)
dataPublisherRef ! "go"
Source(dataPublisher)
.map(ServerSentEvent(_))
.via(WithHeartbeats(10.second))
}
build.sbt
在 libraryDependencies
:
"com.github.mauricio" %% "postgresql-async" % "0.2.18"
Postgres 触发器应该调用 select pg_notify('foo', 'payload')
据我所知,Slick不支持LISTEN
。
ActorPublisher
has been deprecated since Akka 2.5.0. Here is an alternative that uses the postgresql-async library and creates a SourceQueue
演员内部:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser
import scala.concurrent.duration._
import scala.concurrent.Await
class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
private implicit val ec = context.system.dispatcher
val queue =
Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
.to(Sink.foreach(println))
.run()
val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
val connection = new PostgreSQLConnection(configuration)
Await.result(connection.connect, 5 seconds)
connection.sendQuery("LISTEN my_channel")
connection.registerNotifyListener { message =>
val msg = message.payload
log.debug("Sending the payload: {}", msg)
self ! msg
}
def receive = {
case payload: String =>
queue.offer(payload).pipeTo(self)
case QueueOfferResult.Dropped =>
log.warning("Dropped a message.")
case QueueOfferResult.Enqueued =>
log.debug("Enqueued a message.")
case QueueOfferResult.Failure(t) =>
log.error("Stream failed: {}", t.getMessage)
case QueueOfferResult.QueueClosed =>
log.debug("Stream closed.")
}
}
上面的代码只是在 PostgreSQL 发生时打印通知;您可以将 Sink.foreach(println)
替换为另一个 Sink
。给运行吧:
import akka.actor._
import akka.stream.ActorMaterializer
object Example extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
system.actorOf(Props(classOf[DbActor], materializer))
}