NATS JetStream:是否可以明确要求 JetStream 发送(重新)发送它在主题 foo.* 中收到的最后几条消息?

NATS JetStream: Is it possible to explicitly ask from JetStream to (re)send the last few messages it received in subject foo.*?

基本上是主题所说的内容。

我想知道是否可以通过某种方式查询 JetStream,使我们能够重新获取主题“foo.*”的最后 15 条消息,或者 JetStream 最近收到的关于主题“foo.*”的消息1.5秒。

如果可能的话,欢迎提供任何代码示例或代码示例链接。

据官方报道docs

  • 可以从某个时间开始抓取消息:最后1.5秒。

DeliverByStartTime
When first consuming messages, start with messages on or after this time. The consumer is required to specify OptStartTime, the time in the stream to start at. It will receive the closest available message on or after that time.

  • 另一个要求,最近15条消息,我觉得不可能
  • 有一种方法可以在 JetStream 中实现 time-related 检索。

    now := time.Now()
    oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500)
    
    js, _ := nc.JetStream()
    sub, err := js.SubscribeSync(
         "foo.*",
         nats.OrderedConsumer(),
         nats.StartTime(oneAndHalfSecondAgo),
    )
    
    for {
        msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones
        if err != nil {
            log.Fatal(err)
        }
    
        // 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here
    
        // 2. if the message is before now we can push it in an array here
    }
    

    请注意,这种技术虽然有用,但效率很低,因为我们是一条一条地抓取消息。

    我们可以使用 .Subscribe()(它是异步的)修改它,但那样我们会遇到不同的问题:

    我们会在当前时刻从 JetStream 过度拉取,然后我们必须确保我们抓取的缓冲消息确实会返回到 JetStream。据我所知,没有配置选项可以告诉 JetStream 关​​于“MaxTime”的信息。

  • 至于如何“获得最新的N-Messages”,可以修改上面的code-sample,这样他就会得到相当多的消息(p.e. 过去 5 秒或 10 秒或 30 秒内的所有消息),在获取到当前时刻的所有消息后,他可以获取最新的 'N' 条消息。

    当然这种技术并不理想,但似乎没有另一种方法可以做到这一点 - 至少在撰写本文时没有。