Scala Slick:永无止境的流

Scala Slick: Never ending stream

使用 Slick,您可以执行以下操作以从 table 生成结果流:

val q = for (e <- events) yield e.name
val p: DatabasePublisher[String] = db.stream(q.result)

p.foreach { s => println(s"Event: $s") }

这将打印 events table 中的所有事件并在最后一行后终止。

假设当新行被输入 events table 时,您可以通过某种方式得到通知,是否可以编写一个流,在事件被插入时连续输出事件? DB table.

的一种 tail -f

我认为 Slick 本身不会支持这个,但我认为应该可以使用 Akka 流来提供帮助。因此,如果您可以从 Slick Source 获取一些东西直到它为空,然后等待事件指示 table 中有更多数据,然后流式传输新数据。可能通过使用 ActorPublisher 来绑定此逻辑?

想知道是否有人在这方面有任何经验或建议?

关于 ActorPublisher 你是正确的 :) 这是一个使用 PostgreSQL 的简单例子,async DB driver and LISTEN/NOTIFY mechanism:

演员:

class PostgresListener extends ActorPublisher[String] {

  override def receive = {
    case _ ⇒
      val configuration = URLParser.parse(s"jdbc://postgresql://$host:$port/$db?user=$user&password=$password")
      val connection = new PostgreSQLConnection(configuration)
      Await.result(connection.connect, 5.seconds)

      connection.sendQuery(s"LISTEN $channel")
      connection.registerNotifyListener { message ⇒ onNext(message.payload) }
  }
}

服务:

def stream: Source[ServerSentEvent, Unit] = {
  val dataPublisherRef = Props[PostgresListener]
  val dataPublisher = ActorPublisher[String](dataPublisherRef)

  dataPublisherRef ! "go"

  Source(dataPublisher)
    .map(ServerSentEvent(_))
    .via(WithHeartbeats(10.second))
}

build.sbtlibraryDependencies:

"com.github.mauricio"  %% "postgresql-async"         % "0.2.18"

Postgres 触发器应该调用 select pg_notify('foo', 'payload')

据我所知,Slick不支持LISTEN

ActorPublisher has been deprecated since Akka 2.5.0. Here is an alternative that uses the postgresql-async library and creates a SourceQueue 演员内部:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser

import scala.concurrent.duration._
import scala.concurrent.Await

class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
  private implicit val ec = context.system.dispatcher

  val queue =  
    Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
      .to(Sink.foreach(println))
      .run()

  val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
  val connection = new PostgreSQLConnection(configuration)
  Await.result(connection.connect, 5 seconds)

  connection.sendQuery("LISTEN my_channel")
  connection.registerNotifyListener { message =>
    val msg = message.payload
    log.debug("Sending the payload: {}", msg)
    self ! msg
  }

  def receive = {
    case payload: String =>
      queue.offer(payload).pipeTo(self)
    case QueueOfferResult.Dropped =>
      log.warning("Dropped a message.")
    case QueueOfferResult.Enqueued =>
      log.debug("Enqueued a message.")
    case QueueOfferResult.Failure(t) =>
      log.error("Stream failed: {}", t.getMessage)
    case QueueOfferResult.QueueClosed =>
      log.debug("Stream closed.")
  }
}

上面的代码只是在 PostgreSQL 发生时打印通知;您可以将 Sink.foreach(println) 替换为另一个 Sink。给运行吧:

import akka.actor._
import akka.stream.ActorMaterializer

object Example extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  system.actorOf(Props(classOf[DbActor], materializer))
}