编年史队列上的 StreamCorruptedException 使队列不可用

StreamCorruptedException on chronicle queue making the queue unusable

我们有一个来自 chroniclequeue(chronicle-queue-5.20.106,Red Hat Linux 6.10 版)的 StreamCorruptedException,我们已经在下面粘贴了堆栈跟踪。在那段时间里,完全不同的进程有一个非常高的 IO/disk 操作,我们认为这导致 chroniclequeue 暂停超过 15 秒并导致此损坏。

即使在重新启动后队列已损坏且无法启动。唯一的办法是删除并重新开始,这意味着丢失数百万数据

请帮助解决问题或解决问题。谢谢

堆栈跟踪

2020-11-18 09:55:38,905536 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Couldn't acquire write lock after 15000 ms for the lock file:/local/data/metadata.cq4t, overriding the lock. Lock was held by me
2020-11-18 09:55:38,905795 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Forced unlock for the lock file:/local/data/metadata.cq4t, unlocked: true net.openhft.chronicle.core.StackTrace: Forced unlock on Reader STRESSTEST01
        at net.openhft.chronicle.queue.impl.table.AbstractTSQueueLock.forceUnlockIfProcessIsDead(AbstractTSQueueLock.java:52)
        at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:70)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:349)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:325)

接着是

2020-11-18 09:55:42,364992 [] [ChronicleTxn] (ERROR) Error on commit java.lang.IllegalStateException: java.io.StreamCorruptedException: Data at 138604 overwritten? Expected: 0 was c3
        at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:842)
        at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:782)

重新启动时出错

java.lang.UnsupportedOperationException: Unknown_4
        at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.cantRead(BinaryWire.java:3648)
        at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.bytes(BinaryWire.java:2591)

模拟测试CLASS

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

public class SimulateStreamCorruptedException {
private static final int NO_OF_DOCUMENTS_TO_INSERT = 100_000;
private static final int NO_OF_THREADS = 50;
private String textToWrite = "This is a sample text to be written and value is ";
private String dbFolder = System.getProperty("dbFolder","/tmp/chroniclequeue");
private AtomicLong noOfDocuments = new AtomicLong();

public static void main(String[] args) throws InterruptedException {
    SimulateStreamCorruptedException simulator = new SimulateStreamCorruptedException();
    simulator.simulateError();
}

private void simulateError() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(NO_OF_THREADS);
    ScheduledExecutorService preTouchScheduler = Executors.newScheduledThreadPool(1);
    try(ChronicleQueue queue = getQueue()) {
        preTouchScheduler.scheduleAtFixedRate(() -> queue.acquireAppender().pretouch(), 0, 1, TimeUnit.SECONDS);
        IntStream.rangeClosed(1, NO_OF_THREADS).forEach(i -> startWriterThread(queue,i,latch));
        latch.await();
    } finally {
        preTouchScheduler.shutdownNow();
    }
}

private void startWriterThread(ChronicleQueue queue,int threadCount,CountDownLatch latch) {
    Runnable task = () -> {
        System.out.println("Starting the writing for Thread-"+threadCount);
        IntStream.rangeClosed(1, NO_OF_DOCUMENTS_TO_INSERT).forEach(i -> {
            try(DocumentContext dc = queue.acquireAppender().writingDocument()) {
                String text = textToWrite+(threadCount+i);
                dc.wire().write().bytes(text.getBytes());
                simulatePause();
            }
        });
        System.out.println("Completed the writing for Thread-"+threadCount);
        latch.countDown();
    };
    new Thread(task).start();
}

private void simulatePause() {
    if(noOfDocuments.incrementAndGet()%100==0) {
        try {Thread.sleep(20*1000);}
        catch (InterruptedException e) {e.printStackTrace();}
    }
}

private ChronicleQueue getQueue() {
    File folder = new File(dbFolder);
    if(!folder.exists()) folder.mkdirs();
    return ChronicleQueue.singleBuilder(folder)
            .rollCycle(RollCycles.DAILY)
            .strongAppenders(true)
            .build();
}

}

如果您的应用程序有可能停滞 15 秒,那么在 Chronicle 队列方面没有可能的解决方案 - 您应该重新考虑您的软件的工作方式,因为 Chronicle 的工具是在开发时考虑到超低延迟的,并且我们迎合微秒延迟,而不是秒。

如果锁被强行解锁(这里是这种情况),数据将被不可逆转地损坏。

然而,解决方法可能是增加超时 - 默认值为 15000 毫秒,但在创建队列时,您可以通过使用 builder#timeoutMS() 指定适合您的环境的内容来增加超时。