Mongo change-Stream with Spring resumeAt vs startAfter 和连接丢失时的容错

Mongo change-Stream with Spring resumeAt vs startAfter and fault tolerance in case of connection loss

在 Whosebug 或任何文档中都找不到答案, 我有以下更改流代码(收听数据库而不是特定集合)

Mongo版本是4.2

@Configuration
public class DatabaseChangeStreamListener {

//Constructor, fields etc...

    @PostConstruct
    public void initialize() {
        MessageListenerContainer container = new DefaultMessageListenerContainer(mongoTemplate, new SimpleAsyncTaskExecutor(), this::onException);
        ChangeStreamRequest.ChangeStreamRequestOptions options =
                new ChangeStreamRequest.ChangeStreamRequestOptions(mongoTemplate.getDb().getName(), null, buildChangeStreamOptions());
        container.register(new ChangeStreamRequest<>(this::onDatabaseChangedEvent, options), Document.class);
        container.start();
    }

    private ChangeStreamOptions buildChangeStreamOptions() {
        return ChangeStreamOptions.builder()
                .returnFullDocumentOnUpdate()
                .filter(newAggregation(match(where(OPERATION_TYPE).in(INSERT.getValue(), UPDATE.getValue(), REPLACE.getValue(), DELETE.getValue()))))
                .resumeAt(Instant.now().minusSeconds(1))
                .build();
    }
//more code
}

我希望流仅从系统启动时间开始监听,而不在操作日志中事先获取任何内容,.resumeAt(Instant.now().minusSeconds(1)) 可以吗? 如果需要,我是否需要使用 starAfter 方法?如何在数据库中找到最新的 resumeToken? 还是开箱即用,我不需要添加任何 resume/start 行?

第二个问题,我从不停止容器(它应该在应用程序处于 运行 时始终存在),如果与 mongoDB 断开连接并重新连接,当前配置中的侦听器将继续使用消息? (我模拟DB断线好难)

如果它不会恢复处理事件,我需要在配置中更改什么,以便更改流将继续并获取断开连接之前最后接收到的 resumeToken 的所有事件? 我在媒体 change stream in prodcution 上阅读了这篇很棒的文章, 但它直接使用光标,我想使用spring DefaultMessageListenerContainer,因为它更优雅。

所以我会回答我自己的(一些更愚蠢,一些更少:)...)问题:

  1. 当没有提供 resumeAt 时间戳时,更改流将从当前时间开始,并且不会绘制任何以前的事件。
  2. resumeAfter 事件与时间戳的区别可以在这里找到: 但请记住,对于时间戳,它包含事件,因此如果您想从下一个事件(在 java 中)开始,请执行:
    private BsonTimestamp getNextEventTimestamp(BsonTimestamp timestamp) {
        return new BsonTimestamp(timestamp.getValue() + 1);
    }
  1. 如果互联网断开,更改流将不会恢复, 因此,我建议在出现错误时采取以下方法:
    private void onException() {
        ScheduledExecutorService executorService = newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> recreateChangeStream(executorService), 0, 1, TimeUnit.SECONDS);
    }

    private void recreateChangeStream(ScheduledExecutorService executorService) {
        try {
            mongoTemplate.getDb().runCommand(new BasicDBObject("ping", "1"));
            container.stop();
            startNewContainer();
            executorService.shutdown();
        } catch (Exception ignored) {
        }
    }

首先,我正在创建一个始终运行的可运行计划任务(但一次只能运行 1 个 newSingleThreadScheduledExecutor()),我正在尝试 ping 数据库,成功 ping 后我将停止旧容器并启动一个新的,你也可以传递你最后一次拍摄的时间戳,这样你就可以得到你可能错过的所有事件

从事件中检索时间戳:

BsonTimestamp resumeAtTimestamp = changeStreamDocument.getClusterTime();

那我就关掉任务了

还要确保 resumeAtTimestamp 存在于 oplog 中...