如何用redisson和ZIO监听redis list事件
How to listen redis list event with redisson and ZIO
在丽笙酒店,只需接收活动就足以将新项目添加到列表中。为此,您需要执行以下操作:
object Test extends App {
val redisson = Redisson.create()
val events = redisson.getQueue[String]("minio_events", new StringCodec())
events.addListener(new ListAddListener() {
override def onListAdd(o: String): Unit = println(o)
})
}
当需要将其包装在 ZIO 中时,困难就开始了。如何将此事件包装在 ZIO 或 ZStream 中以启动事件处理链?
ZStream 是基于拉取的,这意味着您必须以某种方式从 minio_events
中拉取数据
val redisson = Redisson.create()
val bqueue : RQueue[String] = redisson.getQueue("minio_events", new StringCodec())
val pollQueue =
ZIO
.effect(Option(bqueue.poll())) // RQueue.poll returns null if the queue is empty
.someOrFail(NoElementsOnStream)
这会创建一个 ZIO[Any, Throwable, String]
表示您的轮询操作,现在可以通过调用 ZStream.fromEffect
方法
将其转换为 ZStream
ZStream
.fromEffect(pollQueue)
.foreach(s => putStrLn(s))
.exitCode
如果将这段代码放在 zio.App
主函数中,您会发现它只 运行 一次。所以我们需要让它永远 运行 并重试直到找到另一个元素
ZStream
.fromEffect(pollQueue)
.retry(Schedule.spaced(1.second))
.forever
.foreach(s => putStrLn(s))
.exitCode
Redisson 似乎支持将 RedissonClient
转换为具有 zio-interop 的响应式流客户端。但是如果你只想直接使用 java 界面,我想你可以这样做(注意我还没有实际测试过):
object Test extends zio.App {
def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] =
(for {
// Construct the client in the scope of the stream so it shuts down when done
c <- ZStream.managed(ZManaged.makeEffect(Redisson.create())(_.shutdown()))
// Variant of effectAsync* that lets you specify an interrupter
s <- ZStream.effectAsyncInterrupt[Any, Nothing, String] { k =>
val queue = c.getQueue[String]("", new StringCodec())
val listenerId = queue.addListener(new ListAddListener {
// Invoke the callback by passing in a ZIO with a single chunk
def onListAdd(name: String): Unit = k(ZIO.succeed(Chunk.single(name)))
})
// Return a cancellation handler.
Left(UIO(queue.removeListener(listenerId)))
}
} { zio.console.putStrLn(s) }).exitCode
}
在丽笙酒店,只需接收活动就足以将新项目添加到列表中。为此,您需要执行以下操作:
object Test extends App {
val redisson = Redisson.create()
val events = redisson.getQueue[String]("minio_events", new StringCodec())
events.addListener(new ListAddListener() {
override def onListAdd(o: String): Unit = println(o)
})
}
当需要将其包装在 ZIO 中时,困难就开始了。如何将此事件包装在 ZIO 或 ZStream 中以启动事件处理链?
ZStream 是基于拉取的,这意味着您必须以某种方式从 minio_events
中拉取数据
val redisson = Redisson.create()
val bqueue : RQueue[String] = redisson.getQueue("minio_events", new StringCodec())
val pollQueue =
ZIO
.effect(Option(bqueue.poll())) // RQueue.poll returns null if the queue is empty
.someOrFail(NoElementsOnStream)
这会创建一个 ZIO[Any, Throwable, String]
表示您的轮询操作,现在可以通过调用 ZStream.fromEffect
方法
ZStream
ZStream
.fromEffect(pollQueue)
.foreach(s => putStrLn(s))
.exitCode
如果将这段代码放在 zio.App
主函数中,您会发现它只 运行 一次。所以我们需要让它永远 运行 并重试直到找到另一个元素
ZStream
.fromEffect(pollQueue)
.retry(Schedule.spaced(1.second))
.forever
.foreach(s => putStrLn(s))
.exitCode
Redisson 似乎支持将 RedissonClient
转换为具有 zio-interop 的响应式流客户端。但是如果你只想直接使用 java 界面,我想你可以这样做(注意我还没有实际测试过):
object Test extends zio.App {
def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] =
(for {
// Construct the client in the scope of the stream so it shuts down when done
c <- ZStream.managed(ZManaged.makeEffect(Redisson.create())(_.shutdown()))
// Variant of effectAsync* that lets you specify an interrupter
s <- ZStream.effectAsyncInterrupt[Any, Nothing, String] { k =>
val queue = c.getQueue[String]("", new StringCodec())
val listenerId = queue.addListener(new ListAddListener {
// Invoke the callback by passing in a ZIO with a single chunk
def onListAdd(name: String): Unit = k(ZIO.succeed(Chunk.single(name)))
})
// Return a cancellation handler.
Left(UIO(queue.removeListener(listenerId)))
}
} { zio.console.putStrLn(s) }).exitCode
}