如何用redisson和ZIO监听redis list事件

How to listen redis list event with redisson and ZIO

在丽笙酒店,只需接收活动就足以将新项目添加到列表中。为此,您需要执行以下操作:

object Test extends App {
  val redisson = Redisson.create()
  val events = redisson.getQueue[String]("minio_events", new StringCodec())
  events.addListener(new ListAddListener() {
    override def onListAdd(o: String): Unit = println(o)
  })
}

当需要将其包装在 ZIO 中时,困难就开始了。如何将此事件包装在 ZIO 或 ZStream 中以启动事件处理链?

ZStream 是基于拉取的,这意味着您必须以某种方式从 minio_events 中拉取数据

val redisson = Redisson.create()    
val bqueue : RQueue[String] = redisson.getQueue("minio_events", new StringCodec())
    
val pollQueue = 
  ZIO
    .effect(Option(bqueue.poll())) // RQueue.poll returns null if the queue is empty
    .someOrFail(NoElementsOnStream)

这会创建一个 ZIO[Any, Throwable, String] 表示您的轮询操作,现在可以通过调用 ZStream.fromEffect 方法

将其转换为 ZStream
ZStream
  .fromEffect(pollQueue)
  .foreach(s => putStrLn(s))
  .exitCode

如果将这段代码放在 zio.App 主函数中,您会发现它只 运行 一次。所以我们需要让它永远 运行 并重试直到找到另一个元素

ZStream
  .fromEffect(pollQueue)
  .retry(Schedule.spaced(1.second))
  .forever
  .foreach(s => putStrLn(s))  
  .exitCode

Redisson 似乎支持将 RedissonClient 转换为具有 zio-interop 的响应式流客户端。但是如果你只想直接使用 java 界面,我想你可以这样做(注意我还没有实际测试过):

object Test extends zio.App {
   def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = 
       (for {
         // Construct the client in the scope of the stream so it shuts down when done
         c <- ZStream.managed(ZManaged.makeEffect(Redisson.create())(_.shutdown()))

         // Variant of effectAsync* that lets you specify an interrupter
         s <- ZStream.effectAsyncInterrupt[Any, Nothing, String] { k =>
           val queue = c.getQueue[String]("", new StringCodec())
           val listenerId = queue.addListener(new ListAddListener {

             // Invoke the callback by passing in a ZIO with a single chunk
             def onListAdd(name: String): Unit = k(ZIO.succeed(Chunk.single(name)))
           })

           // Return a cancellation handler.
           Left(UIO(queue.removeListener(listenerId)))
        }
      } { zio.console.putStrLn(s) }).exitCode
}