Aeron Archive - 扩展现有记录
Aeron Archive - extend existing recording
我有一个 Aeron Archive,想扩展现有的记录,即它在服务重启后继续附加消息。
我找不到任何实际的例子如何做到这一点,所以我带来了我自己的基于 Aeron javadoc/Aeron Cookbook.
的代码
我目前有什么
我试图先从存档本身获取 lastRecordingId、位置等。
private void findLatestRecording() {
final RecordingDescriptorConsumer consumer =
(controlSessionId, correlationId, recordingId,
startTimestamp, stopTimestamp, startPosition,
stopPosition, initialTermId, segmentFileLength,
termBufferLength, mtuLength, sessionId,
streamId, strippedChannel, originalChannel,
sourceIdentity) -> {
AeronArchiveJournal.this.lastRecordingId = recordingId;
AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
AeronArchiveJournal.this.initialTermId = initialTermId;
AeronArchiveJournal.this.termBufferLength = termBufferLength;
};
final long fromRecordingId = 0L;
final int recordCount = 100;
final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);
if (foundCount == 0) {
LOG.info("No previous recording found, will start a new one");
}
}
那我想延长录音时间
private void extendExistingRecording() {
publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);
String channelUri = new ChannelUriStringBuilder()
.media(CommonContext.IPC_MEDIA)
.initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
.build();
LOG.info("Extending existing recording");
LOG.info("Recording id: {}", lastRecordingId);
LOG.info("Channel URI: {}", channelUri);
archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);
LOG.info("Waiting for recording to start for session {}", publication.sessionId());
final CountersReader counters = aeron.countersReader();
int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
while (CountersReader.NULL_COUNTER_ID == counterId) {
journalIdleStrategy.idle();
counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
}
lastRecordingId = RecordingPos.getRecordingId(counters, counterId);
}
然而 CountersReader
循环永远不会结束并且 aeron 打印以下警告:
io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312
显然我遗漏了什么,但到目前为止我无法弄清楚是什么。
要扩展现有记录,必须设置用于扩展现有记录的出版物的初始位置。流需要初始化,所以它是一个真正的扩展。这个测试展示了它是如何完成的。
我有一个 Aeron Archive,想扩展现有的记录,即它在服务重启后继续附加消息。
我找不到任何实际的例子如何做到这一点,所以我带来了我自己的基于 Aeron javadoc/Aeron Cookbook.
的代码我目前有什么
我试图先从存档本身获取 lastRecordingId、位置等。
private void findLatestRecording() {
final RecordingDescriptorConsumer consumer =
(controlSessionId, correlationId, recordingId,
startTimestamp, stopTimestamp, startPosition,
stopPosition, initialTermId, segmentFileLength,
termBufferLength, mtuLength, sessionId,
streamId, strippedChannel, originalChannel,
sourceIdentity) -> {
AeronArchiveJournal.this.lastRecordingId = recordingId;
AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
AeronArchiveJournal.this.initialTermId = initialTermId;
AeronArchiveJournal.this.termBufferLength = termBufferLength;
};
final long fromRecordingId = 0L;
final int recordCount = 100;
final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);
if (foundCount == 0) {
LOG.info("No previous recording found, will start a new one");
}
}
那我想延长录音时间
private void extendExistingRecording() {
publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);
String channelUri = new ChannelUriStringBuilder()
.media(CommonContext.IPC_MEDIA)
.initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
.build();
LOG.info("Extending existing recording");
LOG.info("Recording id: {}", lastRecordingId);
LOG.info("Channel URI: {}", channelUri);
archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);
LOG.info("Waiting for recording to start for session {}", publication.sessionId());
final CountersReader counters = aeron.countersReader();
int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
while (CountersReader.NULL_COUNTER_ID == counterId) {
journalIdleStrategy.idle();
counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
}
lastRecordingId = RecordingPos.getRecordingId(counters, counterId);
}
然而 CountersReader
循环永远不会结束并且 aeron 打印以下警告:
io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312
显然我遗漏了什么,但到目前为止我无法弄清楚是什么。
要扩展现有记录,必须设置用于扩展现有记录的出版物的初始位置。流需要初始化,所以它是一个真正的扩展。这个测试展示了它是如何完成的。