停止自定义 logback 异步附加程序的正确方法
Correct way to stop custom logback async appender
我使用 Amazon 的 Java SDK 创建了 Amazon SQS 和 SNS logback appender。基本附加程序使用同步 Java APIs,但我还通过扩展 ch.qos.logback.classic.AsyncAppender
class.
创建了两者的异步版本
使用异步附加程序停止 logback 记录器上下文并没有按预期工作。当上下文停止时,所有异步附加程序都会尝试在退出之前刷新剩余的事件。问题源于 ch.qos.logback.core.AsyncAppenderBase#stop
方法,它中断了工作线程。当 Amazon SDK 仍在处理排队的事件并产生 com.amazonaws.AbortedException
时触发中断。在我的测试中,AbortedException
发生在 SDK 处理来自 API 的响应时,因此实际消息通过了,但情况可能并非总是如此。
是否有意让 logback 中断工作线程,即使工作线程仍应处理剩余的事件队列?如果是这样,我该如何解决中断引起的 AbortedException
?我可以覆盖整个停止方法并删除中断,但这需要复制粘贴大部分实现。
我终于想出了一个解决方案,我想这不是最优的,也远非简单,但它确实有效。
我的第一次尝试是将 AWS SDK APIs 的异步版本与 logback 提供的执行器一起使用,因为使用内部执行器可以避免中断问题。但这没有成功,因为工作队列是共享的,在这种情况下,队列必须是特定于 appender 的,以允许正确停止它。所以我需要对每个附加程序使用自己的执行程序。
首先我需要一个 AWS 客户端的执行器。执行器的问题是提供的线程工厂必须创建守护线程,否则如果使用 logback 的 JVM 关闭挂钩,它将无限期阻塞。
public static ExecutorService newExecutor(Appender<?> appender, int threadPoolSize) {
final String name = appender.getName();
return Executors.newFixedThreadPool(threadPoolSize, new ThreadFactory() {
private final AtomicInteger idx = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(name + "-" + idx.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});
}
下一个问题是如何用中断正确停止appender?这需要通过重试来处理中断的异常,否则执行程序将跳过等待队列刷新的过程。
public static void shutdown(Appender<?> appender, ExecutorService executor, long waitMillis) {
executor.shutdown();
boolean completed = awaitTermination(appender, executor, waitMillis);
if (!completed) {
appender.addWarn(format("Executor for %s did not shut down in %d milliseconds, " +
"logging events might have been discarded",
appender.getName(), waitMillis));
}
}
private static boolean awaitTermination(Appender<?> appender, ExecutorService executor, long waitMillis) {
long started = System.currentTimeMillis();
try {
return executor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie1) {
// the worker loop is stopped by interrupt, but the remaining queue should still be handled
long waited = System.currentTimeMillis() - started;
if (waited < waitMillis) {
try {
return executor.awaitTermination(waitMillis - waited, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie2) {
appender.addError(format("Shut down of executor for %s was interrupted",
appender.getName()));
}
}
Thread.currentThread().interrupt();
}
return false;
}
正常的 logback appender 应该以同步方式工作,因此即使没有适当的关闭挂钩也不应该丢失日志记录事件。这是当前异步 AWS SDK API 调用的问题。我决定使用倒计时闩锁来提供阻塞附加程序行为。
public class LoggingEventHandler<REQUEST extends AmazonWebServiceRequest, RESULT> implements AsyncHandler<REQUEST, RESULT> {
private final ContextAware contextAware;
private final CountDownLatch latch;
private final String errorMessage;
public LoggingEventHandler(ContextAware contextAware, CountDownLatch latch, String errorMessage) {
this.contextAware = contextAware;
this.latch = latch;
this.errorMessage = errorMessage;
}
@Override
public void onError(Exception exception) {
contextAware.addWarn(errorMessage, exception);
latch.countDown();
}
@Override
public void onSuccess(REQUEST request, RESULT result) {
latch.countDown();
}
}
并处理闩锁等待。
public static void awaitLatch(Appender<?> appender, CountDownLatch latch, long waitMillis) {
if (latch.getCount() > 0) {
try {
boolean completed = latch.await(waitMillis, TimeUnit.MILLISECONDS);
if (!completed) {
appender.addWarn(format("Appender '%s' did not complete sending event in %d milliseconds, " +
"the event might have been lost",
appender.getName(), waitMillis));
}
} catch (InterruptedException ex) {
appender.addWarn(format("Appender '%s' was interrupted, " +
"a logging event might have been lost or shutdown was initiated",
appender.getName()));
Thread.currentThread().interrupt();
}
}
}
然后全部捆绑在一起。以下示例是实际实现的简化版本,仅显示与此问题相关的部分。
public class SqsAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
private AmazonSQSAsyncClient sqs;
@Override
public void start() {
sqs = new AmazonSQSAsyncClient(
getCredentials(),
getClientConfiguration(),
Executors.newFixedThreadPool(getThreadPoolSize())
);
super.start();
}
@Override
public void stop() {
super.stop();
if (sqs != null) {
AppenderExecutors.shutdown(this, sqs.getExecutorService(), getMaxFlushTime());
sqs.shutdown();
sqs = null;
}
}
@Override
protected void append(final ILoggingEvent eventObject) {
SendMessageRequest request = ...
CountDownLatch latch = new CountDownLatch(1);
sqs.sendMessageAsync(request, new LoggingEventHandler<SendMessageRequest, SendMessageResult>(this, latch, "Error"));
AppenderExecutors.awaitLatch(this, latch, getMaxFlushTime());
}
}
所有这些都是正确处理以下情况所必需的:
- 当使用异步附加程序包装器时,在 logback 上下文停止或关闭挂钩时刷新剩余的事件队列
- 当使用 logback 的延迟关闭挂钩时,不要无限期阻塞
- 在不使用异步附加程序时提供阻塞行为
- 从导致所有 AWS SDK 流实现中断的异步附加程序停止中断中幸存下来
以上是我维护的开源项目Logback extensions中使用的。
我使用 Amazon 的 Java SDK 创建了 Amazon SQS 和 SNS logback appender。基本附加程序使用同步 Java APIs,但我还通过扩展 ch.qos.logback.classic.AsyncAppender
class.
使用异步附加程序停止 logback 记录器上下文并没有按预期工作。当上下文停止时,所有异步附加程序都会尝试在退出之前刷新剩余的事件。问题源于 ch.qos.logback.core.AsyncAppenderBase#stop
方法,它中断了工作线程。当 Amazon SDK 仍在处理排队的事件并产生 com.amazonaws.AbortedException
时触发中断。在我的测试中,AbortedException
发生在 SDK 处理来自 API 的响应时,因此实际消息通过了,但情况可能并非总是如此。
是否有意让 logback 中断工作线程,即使工作线程仍应处理剩余的事件队列?如果是这样,我该如何解决中断引起的 AbortedException
?我可以覆盖整个停止方法并删除中断,但这需要复制粘贴大部分实现。
我终于想出了一个解决方案,我想这不是最优的,也远非简单,但它确实有效。
我的第一次尝试是将 AWS SDK APIs 的异步版本与 logback 提供的执行器一起使用,因为使用内部执行器可以避免中断问题。但这没有成功,因为工作队列是共享的,在这种情况下,队列必须是特定于 appender 的,以允许正确停止它。所以我需要对每个附加程序使用自己的执行程序。
首先我需要一个 AWS 客户端的执行器。执行器的问题是提供的线程工厂必须创建守护线程,否则如果使用 logback 的 JVM 关闭挂钩,它将无限期阻塞。
public static ExecutorService newExecutor(Appender<?> appender, int threadPoolSize) {
final String name = appender.getName();
return Executors.newFixedThreadPool(threadPoolSize, new ThreadFactory() {
private final AtomicInteger idx = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(name + "-" + idx.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});
}
下一个问题是如何用中断正确停止appender?这需要通过重试来处理中断的异常,否则执行程序将跳过等待队列刷新的过程。
public static void shutdown(Appender<?> appender, ExecutorService executor, long waitMillis) {
executor.shutdown();
boolean completed = awaitTermination(appender, executor, waitMillis);
if (!completed) {
appender.addWarn(format("Executor for %s did not shut down in %d milliseconds, " +
"logging events might have been discarded",
appender.getName(), waitMillis));
}
}
private static boolean awaitTermination(Appender<?> appender, ExecutorService executor, long waitMillis) {
long started = System.currentTimeMillis();
try {
return executor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie1) {
// the worker loop is stopped by interrupt, but the remaining queue should still be handled
long waited = System.currentTimeMillis() - started;
if (waited < waitMillis) {
try {
return executor.awaitTermination(waitMillis - waited, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie2) {
appender.addError(format("Shut down of executor for %s was interrupted",
appender.getName()));
}
}
Thread.currentThread().interrupt();
}
return false;
}
正常的 logback appender 应该以同步方式工作,因此即使没有适当的关闭挂钩也不应该丢失日志记录事件。这是当前异步 AWS SDK API 调用的问题。我决定使用倒计时闩锁来提供阻塞附加程序行为。
public class LoggingEventHandler<REQUEST extends AmazonWebServiceRequest, RESULT> implements AsyncHandler<REQUEST, RESULT> {
private final ContextAware contextAware;
private final CountDownLatch latch;
private final String errorMessage;
public LoggingEventHandler(ContextAware contextAware, CountDownLatch latch, String errorMessage) {
this.contextAware = contextAware;
this.latch = latch;
this.errorMessage = errorMessage;
}
@Override
public void onError(Exception exception) {
contextAware.addWarn(errorMessage, exception);
latch.countDown();
}
@Override
public void onSuccess(REQUEST request, RESULT result) {
latch.countDown();
}
}
并处理闩锁等待。
public static void awaitLatch(Appender<?> appender, CountDownLatch latch, long waitMillis) {
if (latch.getCount() > 0) {
try {
boolean completed = latch.await(waitMillis, TimeUnit.MILLISECONDS);
if (!completed) {
appender.addWarn(format("Appender '%s' did not complete sending event in %d milliseconds, " +
"the event might have been lost",
appender.getName(), waitMillis));
}
} catch (InterruptedException ex) {
appender.addWarn(format("Appender '%s' was interrupted, " +
"a logging event might have been lost or shutdown was initiated",
appender.getName()));
Thread.currentThread().interrupt();
}
}
}
然后全部捆绑在一起。以下示例是实际实现的简化版本,仅显示与此问题相关的部分。
public class SqsAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
private AmazonSQSAsyncClient sqs;
@Override
public void start() {
sqs = new AmazonSQSAsyncClient(
getCredentials(),
getClientConfiguration(),
Executors.newFixedThreadPool(getThreadPoolSize())
);
super.start();
}
@Override
public void stop() {
super.stop();
if (sqs != null) {
AppenderExecutors.shutdown(this, sqs.getExecutorService(), getMaxFlushTime());
sqs.shutdown();
sqs = null;
}
}
@Override
protected void append(final ILoggingEvent eventObject) {
SendMessageRequest request = ...
CountDownLatch latch = new CountDownLatch(1);
sqs.sendMessageAsync(request, new LoggingEventHandler<SendMessageRequest, SendMessageResult>(this, latch, "Error"));
AppenderExecutors.awaitLatch(this, latch, getMaxFlushTime());
}
}
所有这些都是正确处理以下情况所必需的:
- 当使用异步附加程序包装器时,在 logback 上下文停止或关闭挂钩时刷新剩余的事件队列
- 当使用 logback 的延迟关闭挂钩时,不要无限期阻塞
- 在不使用异步附加程序时提供阻塞行为
- 从导致所有 AWS SDK 流实现中断的异步附加程序停止中断中幸存下来
以上是我维护的开源项目Logback extensions中使用的。