在 PGConnection.getNotifications 中获取大小
Fetch size in PGConnection.getNotifications
我的 postgresql 数据库中的一个函数在更新 table 时发送通知。
我正在通过 scalikejdbc 轮询 postgresql 数据库,以获取所有通知,然后对它们进行处理。
该过程解释 here 。一个典型的响应式系统 sql tables 更新。
我从 java.sql.Connection 获取 PGConnection。然后,我以这种方式收到通知:
val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())
我试图通过将提取大小设置为 1000 并禁用自动提交来以 1000 块为单位获取通知。但是获取大小 属性 被忽略了。
知道我该怎么做吗?
我不想在我的通知数据集上的单个地图中处理数十万个通知。
pgConnection.getNotifications.size 可能很大,因此,此代码无法很好地扩展。
谢谢!!!
为了更好地扩展,请考虑使用 postgresql-async and Akka Streams: the former is a library that can obtain PostgreSQL notifications asynchronously, and the former is a Reactive Streams 提供背压的实现(这将避免分页的需要)。例如:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser
import scala.concurrent.duration._
import scala.concurrent.Await
class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
private implicit val ec = context.system.dispatcher
val queue =
Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
.to(Sink.foreach(println))
.run()
val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
val connection = new PostgreSQLConnection(configuration)
Await.result(connection.connect, 5 seconds)
connection.sendQuery("LISTEN my_channel")
connection.registerNotifyListener { message =>
val msg = message.payload
log.debug("Sending the payload: {}", msg)
self ! msg
}
def receive = {
case payload: String =>
queue.offer(payload).pipeTo(self)
case QueueOfferResult.Dropped =>
log.warning("Dropped a message.")
case QueueOfferResult.Enqueued =>
log.debug("Enqueued a message.")
case QueueOfferResult.Failure(t) =>
log.error("Stream failed: {}", t.getMessage)
case QueueOfferResult.QueueClosed =>
log.debug("Stream closed.")
}
}
上面的代码只是在 PostgreSQL 发生时打印通知;您可以将 Sink.foreach(println)
替换为另一个 Sink
。给运行吧:
import akka.actor._
import akka.stream.ActorMaterializer
object Example extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
system.actorOf(Props(classOf[DbActor], materializer))
}
我的 postgresql 数据库中的一个函数在更新 table 时发送通知。 我正在通过 scalikejdbc 轮询 postgresql 数据库,以获取所有通知,然后对它们进行处理。 该过程解释 here 。一个典型的响应式系统 sql tables 更新。 我从 java.sql.Connection 获取 PGConnection。然后,我以这种方式收到通知:
val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())
我试图通过将提取大小设置为 1000 并禁用自动提交来以 1000 块为单位获取通知。但是获取大小 属性 被忽略了。
知道我该怎么做吗? 我不想在我的通知数据集上的单个地图中处理数十万个通知。
pgConnection.getNotifications.size 可能很大,因此,此代码无法很好地扩展。
谢谢!!!
为了更好地扩展,请考虑使用 postgresql-async and Akka Streams: the former is a library that can obtain PostgreSQL notifications asynchronously, and the former is a Reactive Streams 提供背压的实现(这将避免分页的需要)。例如:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser
import scala.concurrent.duration._
import scala.concurrent.Await
class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
private implicit val ec = context.system.dispatcher
val queue =
Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
.to(Sink.foreach(println))
.run()
val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
val connection = new PostgreSQLConnection(configuration)
Await.result(connection.connect, 5 seconds)
connection.sendQuery("LISTEN my_channel")
connection.registerNotifyListener { message =>
val msg = message.payload
log.debug("Sending the payload: {}", msg)
self ! msg
}
def receive = {
case payload: String =>
queue.offer(payload).pipeTo(self)
case QueueOfferResult.Dropped =>
log.warning("Dropped a message.")
case QueueOfferResult.Enqueued =>
log.debug("Enqueued a message.")
case QueueOfferResult.Failure(t) =>
log.error("Stream failed: {}", t.getMessage)
case QueueOfferResult.QueueClosed =>
log.debug("Stream closed.")
}
}
上面的代码只是在 PostgreSQL 发生时打印通知;您可以将 Sink.foreach(println)
替换为另一个 Sink
。给运行吧:
import akka.actor._
import akka.stream.ActorMaterializer
object Example extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
system.actorOf(Props(classOf[DbActor], materializer))
}