骆驼 MDC Logback 卷下的陈旧信息

Camel MDC Logback Stale Info Under Volume

我们有一个高负载的 Apache Camel 应用程序,它利用 logback/MDC 来记录信息。正如 logback 文档中预先警告的那样,我们发现一些 MDC 信息在线程上是陈旧的。我发现这个 SO 问题解决了这个问题:

How to use MDC with thread pools?

我们应该如何将其应用于我们的骆驼应用程序以避免陈旧的信息?是否有一种简单的方法可以按照链接问题中的建议将默认的 ThreadPoolExecutor 全局更改为自定义变体?我看到你可以自己为池做这件事,但没有看到执行者的任何例子。请记住,我们的应用程序非常大,每天要处理大量订单——我希望对现有应用程序的影响尽可能小。

我想通了,想 post 我所做的,以防其他人受益。请注意我使用的是 JDK 6/camel2.13.2

  • Camel 有一个使用 DefaultThreadPoolFactoryDefaultExecutorServiceManager。我将默认工厂扩展为 MdcThreadPoolFactory

  • DefaultThreadPoolFactory 有生成 RejectableThreadPoolExecutorRejectableScheduledThreadPoolExecutor 的方法。我将这两个扩展到 Mdc* 版本中,该版本覆盖 execute() 方法以包装 Runnable 并在线程之间传递 MDC 信息(如我原始问题中的 link 所指定)。

  • 我在我的应用程序配置中创建了一个 MdcThreadPoolFactory 的 bean 实例,它由 Camel 自动获取并在 ExecutorServiceManager

MdcThreadPoolExecutor:

package com.mypackage.concurrent

import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by broda20.
 * Date: 10/29/15
 */
public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

MdcScheduledThreadPoolExecutor:

package com.mypackage.concurrent

import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by broda20.
 * Date: 10/29/15
 */
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, threadFactory);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
        super(corePoolSize, handler);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, threadFactory, handler);
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

MdcThreadPoolFactory:

package com.mypackage.concurrent

import org.apache.camel.impl.DefaultThreadPoolFactory
import org.apache.camel.spi.ThreadPoolProfile
import org.apache.camel.util.concurrent.SizedScheduledExecutorService
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

public class MdcThreadPoolFactory extends DefaultThreadPoolFactory {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }


    public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut,
                                             RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {

            // the core pool size must be 0 or higher
            if (corePoolSize < 0) {
               throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
            }

            // validate max >= core
            if (maxPoolSize < corePoolSize) {
                throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
            }

            BlockingQueue<Runnable> workQueue;
            if (corePoolSize == 0 && maxQueueSize <= 0) {
                // use a synchronous queue for direct-handover (no tasks stored on the queue)
                workQueue = new SynchronousQueue<Runnable>();
                // and force 1 as pool size to be able to create the thread pool by the JDK
                corePoolSize = 1;
                maxPoolSize = 1;
            } else if (maxQueueSize <= 0) {
                // use a synchronous queue for direct-handover (no tasks stored on the queue)
                workQueue = new SynchronousQueue<Runnable>();
            } else {
                // bounded task queue to store tasks on the queue
                workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
            }

            ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
            answer.setThreadFactory(threadFactory);
            answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
            if (rejectedExecutionHandler == null) {
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
            }
            answer.setRejectedExecutionHandler(rejectedExecutionHandler);
            return answer;
        }

        @Override
        public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
            RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
            if (rejectedExecutionHandler == null) {
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
            }

            ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
            //JDK7: answer.setRemoveOnCancelPolicy(true);

            // need to wrap the thread pool in a sized to guard against the problem that the
            // JDK created thread pool has an unbounded queue (see class javadoc), which mean
            // we could potentially keep adding tasks, and run out of memory.
            if (profile.getMaxPoolSize() > 0) {
                return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
            } else {
                return answer;
            }
        }
}

最后,bean 实例:

<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/>