实现 "live" 流来驱动 Akka 2.4 持久化查询

Implementing a "live" stream to drive an Akka 2.4 Persistence Query

我一直在研究实验性 Akka 持久性查询模块,并且对为我的应用程序实现自定义读取日志非常感兴趣。该文档描述了两种主要类型的查询,一种是 return 日志的当前状态(例如 CurrentPersistenceIdsQuery),另一种是 return 一个可订阅的流,在提交事件时发出事件通过应用程序的写入端到日志(例如 AllPersistenceIdsQuery

对于我设计的应用程序,我使用 Postgres 和 Slick 3.1.1 来驱动这些查询的核心。我可以通过执行以下操作成功地流式传输数据库查询结果:

override def allPersistenceIds = {
  val db = Database.forConfig("postgres")
  val metadata = TableQuery[Metadata]

  val query = for (m <- metadata) yield m.persistenceId
  Source.fromPublisher(db.stream(query.result))
}

但是,一旦底层 Slick DB 操作完成,流就会被指示为完成。这似乎无法满足能够发出新事件的永久开放流的要求。

我的问题是:

谢谢!

它不像这一行代码那么微不足道,但你已经是正确的轨道了。

为了实现 "infinite" 流,您需要多次查询 - 即实现轮询,除非底层数据库允许无限查询(这里它不支持 AFAICS)。

轮询需要跟踪 "offset",因此如果您通过某个标记进行查询,并且发出另一个轮询,则需要从 [=22] 开始(现在是第二次)查询=],而不是 table 的开头。所以你需要某个地方,很可能是一个 Actor,来保持这个偏移量。

Query Side LevelDB 插件不是其他实现的最佳角色模型,因为它对底层日志及其工作方式做了很多假设。此外,LevelDB 不适用于使用 Akka Persistence 进行生产——这是我们发布的 Journal,目的是让您可以开箱即用(无需启动 Cassandra 等)。

如果您正在寻找灵感,MongoDB 插件实际上应该是一个很好的来源,因为它们与 SQL 商店有非常相似的限制。我不确定 SQL 期刊目前是否实现了查询端。

可以使用Postgres replication API to get 'infinite' stream of database events. It's supported by Postgres JDBC driver starting from version 42.0.0, see related pull request。 但是,它不是真正的流,而是来自数据库 WAL 的缓冲同步 reader。

PGReplicationStream stream =
    pgConnection
        .replicationStream()
        .logical()
        .withSlotName("test_decoding")
        .withSlotOption("include-xids", false)
        .withSlotOption("skip-empty-xacts", true)
        .start();
while (true) {
  ByteBuffer buffer = stream.read();
  //process logical changes
}

alpakka project 中为此 reader 提供 Akka Streams 适配器(源代码)会很好。