Aeron Archive - 扩展现有记录

Aeron Archive - extend existing recording

我有一个 Aeron Archive,想扩展现有的记录,即它在服务重启后继续附加消息。

我找不到任何实际的例子如何做到这一点,所以我带来了我自己的基于 Aeron javadoc/Aeron Cookbook.

的代码

我目前有什么

我试图先从存档本身获取 lastRecordingId、位置等。

private void findLatestRecording() {
        final RecordingDescriptorConsumer consumer =
                (controlSessionId, correlationId, recordingId,
                        startTimestamp, stopTimestamp, startPosition,
                        stopPosition, initialTermId, segmentFileLength,
                        termBufferLength, mtuLength, sessionId,
                        streamId, strippedChannel, originalChannel,
                        sourceIdentity) ->  {
                    AeronArchiveJournal.this.lastRecordingId = recordingId;
                    AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
                    AeronArchiveJournal.this.initialTermId = initialTermId;
                    AeronArchiveJournal.this.termBufferLength = termBufferLength;
                };

        final long fromRecordingId = 0L;
        final int recordCount = 100;
        final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);

        if (foundCount == 0) {
            LOG.info("No previous recording found, will start a new one");
        }
    }

那我想延长录音时间

 private void extendExistingRecording() {
        publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);

        String channelUri = new ChannelUriStringBuilder()
                .media(CommonContext.IPC_MEDIA)
                .initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
                .build();

        LOG.info("Extending existing recording");
        LOG.info("Recording id: {}", lastRecordingId);
        LOG.info("Channel URI: {}", channelUri);


        archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);

        LOG.info("Waiting for recording to start for session {}", publication.sessionId());
        final CountersReader counters = aeron.countersReader();
        int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());

        while (CountersReader.NULL_COUNTER_ID == counterId) {
            journalIdleStrategy.idle();
            counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
        }
        lastRecordingId = RecordingPos.getRecordingId(counters, counterId);

    }

然而 CountersReader 循环永远不会结束并且 aeron 打印以下警告:

io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312

显然我遗漏了什么,但到目前为止我无法弄清楚是什么。

要扩展现有记录,必须设置用于扩展现有记录的出版物的初始位置。流需要初始化,所以它是一个真正的扩展。这个测试展示了它是如何完成的。

https://github.com/real-logic/aeron/blob/master/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java