编年史队列上的 StreamCorruptedException 使队列不可用
StreamCorruptedException on chronicle queue making the queue unusable
我们有一个来自 chroniclequeue(chronicle-queue-5.20.106,Red Hat Linux 6.10 版)的 StreamCorruptedException,我们已经在下面粘贴了堆栈跟踪。在那段时间里,完全不同的进程有一个非常高的 IO/disk 操作,我们认为这导致 chroniclequeue 暂停超过 15 秒并导致此损坏。
即使在重新启动后队列已损坏且无法启动。唯一的办法是删除并重新开始,这意味着丢失数百万数据
请帮助解决问题或解决问题。谢谢
堆栈跟踪
2020-11-18 09:55:38,905536 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Couldn't acquire write lock after 15000 ms for the lock file:/local/data/metadata.cq4t, overriding the lock. Lock was held by me
2020-11-18 09:55:38,905795 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Forced unlock for the lock file:/local/data/metadata.cq4t, unlocked: true net.openhft.chronicle.core.StackTrace: Forced unlock on Reader STRESSTEST01
at net.openhft.chronicle.queue.impl.table.AbstractTSQueueLock.forceUnlockIfProcessIsDead(AbstractTSQueueLock.java:52)
at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:70)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:349)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:325)
接着是
2020-11-18 09:55:42,364992 [] [ChronicleTxn] (ERROR) Error on commit java.lang.IllegalStateException: java.io.StreamCorruptedException: Data at 138604 overwritten? Expected: 0 was c3
at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:842)
at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:782)
重新启动时出错
java.lang.UnsupportedOperationException: Unknown_4
at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.cantRead(BinaryWire.java:3648)
at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.bytes(BinaryWire.java:2591)
模拟测试CLASS
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
public class SimulateStreamCorruptedException {
private static final int NO_OF_DOCUMENTS_TO_INSERT = 100_000;
private static final int NO_OF_THREADS = 50;
private String textToWrite = "This is a sample text to be written and value is ";
private String dbFolder = System.getProperty("dbFolder","/tmp/chroniclequeue");
private AtomicLong noOfDocuments = new AtomicLong();
public static void main(String[] args) throws InterruptedException {
SimulateStreamCorruptedException simulator = new SimulateStreamCorruptedException();
simulator.simulateError();
}
private void simulateError() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(NO_OF_THREADS);
ScheduledExecutorService preTouchScheduler = Executors.newScheduledThreadPool(1);
try(ChronicleQueue queue = getQueue()) {
preTouchScheduler.scheduleAtFixedRate(() -> queue.acquireAppender().pretouch(), 0, 1, TimeUnit.SECONDS);
IntStream.rangeClosed(1, NO_OF_THREADS).forEach(i -> startWriterThread(queue,i,latch));
latch.await();
} finally {
preTouchScheduler.shutdownNow();
}
}
private void startWriterThread(ChronicleQueue queue,int threadCount,CountDownLatch latch) {
Runnable task = () -> {
System.out.println("Starting the writing for Thread-"+threadCount);
IntStream.rangeClosed(1, NO_OF_DOCUMENTS_TO_INSERT).forEach(i -> {
try(DocumentContext dc = queue.acquireAppender().writingDocument()) {
String text = textToWrite+(threadCount+i);
dc.wire().write().bytes(text.getBytes());
simulatePause();
}
});
System.out.println("Completed the writing for Thread-"+threadCount);
latch.countDown();
};
new Thread(task).start();
}
private void simulatePause() {
if(noOfDocuments.incrementAndGet()%100==0) {
try {Thread.sleep(20*1000);}
catch (InterruptedException e) {e.printStackTrace();}
}
}
private ChronicleQueue getQueue() {
File folder = new File(dbFolder);
if(!folder.exists()) folder.mkdirs();
return ChronicleQueue.singleBuilder(folder)
.rollCycle(RollCycles.DAILY)
.strongAppenders(true)
.build();
}
}
如果您的应用程序有可能停滞 15 秒,那么在 Chronicle 队列方面没有可能的解决方案 - 您应该重新考虑您的软件的工作方式,因为 Chronicle 的工具是在开发时考虑到超低延迟的,并且我们迎合微秒延迟,而不是秒。
如果锁被强行解锁(这里是这种情况),数据将被不可逆转地损坏。
然而,解决方法可能是增加超时 - 默认值为 15000 毫秒,但在创建队列时,您可以通过使用 builder#timeoutMS() 指定适合您的环境的内容来增加超时。
我们有一个来自 chroniclequeue(chronicle-queue-5.20.106,Red Hat Linux 6.10 版)的 StreamCorruptedException,我们已经在下面粘贴了堆栈跟踪。在那段时间里,完全不同的进程有一个非常高的 IO/disk 操作,我们认为这导致 chroniclequeue 暂停超过 15 秒并导致此损坏。
即使在重新启动后队列已损坏且无法启动。唯一的办法是删除并重新开始,这意味着丢失数百万数据
请帮助解决问题或解决问题。谢谢
堆栈跟踪
2020-11-18 09:55:38,905536 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Couldn't acquire write lock after 15000 ms for the lock file:/local/data/metadata.cq4t, overriding the lock. Lock was held by me
2020-11-18 09:55:38,905795 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Forced unlock for the lock file:/local/data/metadata.cq4t, unlocked: true net.openhft.chronicle.core.StackTrace: Forced unlock on Reader STRESSTEST01
at net.openhft.chronicle.queue.impl.table.AbstractTSQueueLock.forceUnlockIfProcessIsDead(AbstractTSQueueLock.java:52)
at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:70)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:349)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:325)
接着是
2020-11-18 09:55:42,364992 [] [ChronicleTxn] (ERROR) Error on commit java.lang.IllegalStateException: java.io.StreamCorruptedException: Data at 138604 overwritten? Expected: 0 was c3
at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:842)
at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:782)
重新启动时出错
java.lang.UnsupportedOperationException: Unknown_4
at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.cantRead(BinaryWire.java:3648)
at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.bytes(BinaryWire.java:2591)
模拟测试CLASS
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
public class SimulateStreamCorruptedException {
private static final int NO_OF_DOCUMENTS_TO_INSERT = 100_000;
private static final int NO_OF_THREADS = 50;
private String textToWrite = "This is a sample text to be written and value is ";
private String dbFolder = System.getProperty("dbFolder","/tmp/chroniclequeue");
private AtomicLong noOfDocuments = new AtomicLong();
public static void main(String[] args) throws InterruptedException {
SimulateStreamCorruptedException simulator = new SimulateStreamCorruptedException();
simulator.simulateError();
}
private void simulateError() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(NO_OF_THREADS);
ScheduledExecutorService preTouchScheduler = Executors.newScheduledThreadPool(1);
try(ChronicleQueue queue = getQueue()) {
preTouchScheduler.scheduleAtFixedRate(() -> queue.acquireAppender().pretouch(), 0, 1, TimeUnit.SECONDS);
IntStream.rangeClosed(1, NO_OF_THREADS).forEach(i -> startWriterThread(queue,i,latch));
latch.await();
} finally {
preTouchScheduler.shutdownNow();
}
}
private void startWriterThread(ChronicleQueue queue,int threadCount,CountDownLatch latch) {
Runnable task = () -> {
System.out.println("Starting the writing for Thread-"+threadCount);
IntStream.rangeClosed(1, NO_OF_DOCUMENTS_TO_INSERT).forEach(i -> {
try(DocumentContext dc = queue.acquireAppender().writingDocument()) {
String text = textToWrite+(threadCount+i);
dc.wire().write().bytes(text.getBytes());
simulatePause();
}
});
System.out.println("Completed the writing for Thread-"+threadCount);
latch.countDown();
};
new Thread(task).start();
}
private void simulatePause() {
if(noOfDocuments.incrementAndGet()%100==0) {
try {Thread.sleep(20*1000);}
catch (InterruptedException e) {e.printStackTrace();}
}
}
private ChronicleQueue getQueue() {
File folder = new File(dbFolder);
if(!folder.exists()) folder.mkdirs();
return ChronicleQueue.singleBuilder(folder)
.rollCycle(RollCycles.DAILY)
.strongAppenders(true)
.build();
}
}
如果您的应用程序有可能停滞 15 秒,那么在 Chronicle 队列方面没有可能的解决方案 - 您应该重新考虑您的软件的工作方式,因为 Chronicle 的工具是在开发时考虑到超低延迟的,并且我们迎合微秒延迟,而不是秒。
如果锁被强行解锁(这里是这种情况),数据将被不可逆转地损坏。
然而,解决方法可能是增加超时 - 默认值为 15000 毫秒,但在创建队列时,您可以通过使用 builder#timeoutMS() 指定适合您的环境的内容来增加超时。