使用 Slick 监听 PostgreSQL NOTIFY 事件

Listen to PostgreSQL NOTIFY events with Slick

我可以使用 Slick / Play Framework (Scala) 来监听 PostgreSQL NOTIFY 语句吗?

我想做类似的事情:

http://bjorngylling.com/2011-04-13/postgres-listen-notify-with-node-js.html

我认为 Slick 不支持 PostgreSQL's NOTIFY, but the postgresql-async 库。可以使用后者创建一个 Akka Streams Source 并将其合并到一个将数据库通知流式传输到客户端的 Play 端点中:

package controllers

import javax.inject.{Inject, Singleton}

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser
import com.github.mauricio.async.db.util.ExecutorServiceUtils.CachedExecutionContext

import play.api.Logger
import play.api.http.ContentTypes
import play.api.libs.EventSource
import play.api.mvc._

import scala.concurrent.duration._
import scala.concurrent.Await

@Singleton
class DbNotificationController @Inject()(cc: ControllerComponents,
                                         materializer: Materializer)
  extends AbstractController(cc) {

  implicit val mat = materializer

  val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
  val connection = new PostgreSQLConnection(configuration)
  Await.result(connection.connect, 5 seconds)

  val (actor, dbSource) =
    Source.actorRef[String](Int.MaxValue, OverflowStrategy.dropNew)
          .toMat(BroadcastHub.sink[String])(Keep.both)
          .run()

  connection.sendQuery("LISTEN my_channel")
  connection.registerNotifyListener { message =>
    val msg = message.payload
    Logger.debug(s"Sending the payload: $msg")
    actor ! msg
  }

  def index() = Action {
    Ok(views.html.scaladbnotification())
  }

  def streamDb() = Action {
    Ok.chunked(dbSource via EventSource.flow).as(ContentTypes.EVENT_STREAM)
  }
}

在上面的控制器中,当侦听器从数据库接收到通知时,通知中的有效负载(String)被记录并发送给参与者。发送到此参与者的消息提供给 streamDb 端点中使用的 Source。在将有效负载消息发送到客户端之前,它们被转换为 Play 的 EventSource class.


我从 Play streaming example application 中改编了 DbNotificationController,您可以用它来进行实验。如果您想这样做,显然您需要将 DbNotificationController 集成到该项目中:

  1. "com.github.mauricio" %% "postgresql-async" % "0.2.21" 添加到 build.sbt
  2. 根据需要设置 PostgreSQL,包括 NOTIFY,并根据您的配置调整控制器中的数据库 URL。
  3. DbNotificationController 复制并粘贴到 /app/controllers/
  4. 将以下文件(命名为 scaladbnotification.scala.html)复制到 app/views/:
@main {

    <h1>Server Sent Event from DB</h1>

    <h1 id="db"></h1>

    <p>
        DB events are pushed from the Server using a Server Sent Event connection.
    </p>

    <script type="text/javascript" charset="utf-8">
        if (!!window.EventSource) {
            var stringSource = new EventSource("@routes.DbNotificationController.streamDb()");
            stringSource.addEventListener('message', function(e) {
                $('#db').html(e.data.replace(/(\d)/g, '<span></span>'))
            });
        } else {
            $("#db").html("Sorry. This browser doesn't seem to support Server sent event. Check <a href='http://html5test.com/compare/feature/communication-eventSource.html'>html5test</a> for browser compatibility."); 
        }
    </script>    
}

  1. /conf/routes 文件中,添加以下行:
    GET /scala/dbNotification           controllers.DbNotificationController.index()
    GET /scala/dbNotification/liveDb    controllers.DbNotificationController.streamDb()
  1. 使用 sbt run 启动应用程序并在浏览器中导航到以下 URL:

    http://localhost:9000/scala/dbNotification