log4j2 自定义 appender 不停止/退出

log4j2 custom appender does not stop / exit

使用 boxfuse cloudwatchlogs-java-appender 的附加程序作为起点,我创建了一个通用版本的 log4j2 附加程序,用于记录到 AWS CloudWatch。但是,我面临 log4j2 appender 根本没有关闭的问题。

这是我的 appender 插件 CloudwatchLogsLog4J2Appender.java -

package ...

imports ... 

@Plugin(name = CloudwatchLogsLog4J2Appender.APPENDER_NAME, category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
public class CloudwatchLogsLog4J2Appender extends AbstractAppender {
    static final String APPENDER_NAME = "CloudwatchLogs-Appender";
    private final CloudwatchLogsConfig config = new CloudwatchLogsConfig();
    private BlockingQueue<CloudwatchLogsLogEvent> eventQueue;
    private CloudwatchLogsLogEventPutter putter;
    private long discardedCount;

    public CloudwatchLogsLog4J2Appender(String name, Filter filter, Layout<? extends Serializable> layout) {
        super(name, filter, layout);
    }

    public CloudwatchLogsLog4J2Appender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions) {
        super(name, filter, layout, ignoreExceptions);
    }

    // Your custom appender needs to declare a factory method
    // annotated with `@PluginFactory`. Log4j will parse the configuration
    // and call this factory method to construct an appender instance with
    // the configured attributes.
    @PluginFactory
    public static CloudwatchLogsLog4J2Appender createAppender(
            @PluginAttribute(value = "name", defaultString = APPENDER_NAME) String name,
            @PluginElement("Filter") final Filter filter,
            @PluginAttribute("debug") Boolean debug,
            @PluginAttribute("stdoutFallback") Boolean stdoutFallback,
            @PluginAttribute("endpoint") String endpoint,
            @PluginAttribute("logGroupName") String logGroupName,
            @PluginAttribute("module") String module,
            @PluginAttribute(value = "maxEventQueueSize", defaultInt = CloudwatchLogsConfig.DEFAULT_MAX_EVENT_QUEUE_SIZE) Integer maxEventQueueSize,
            @PluginAttribute("region") String region,
            @PluginAttribute("flushDelayInMillis") int flushDelayInMillis) {

        System.out.println("CloudwatchLogsLog4J2Appender:createAppender() called...");

        CloudwatchLogsLog4J2Appender appender = new CloudwatchLogsLog4J2Appender(name, filter, null, true);
        if (debug != null) {
            appender.getConfig().setStdoutFallback(debug);
        }
        if (stdoutFallback != null) {
            appender.getConfig().setStdoutFallback(stdoutFallback);
        }
        if (endpoint != null) {
            appender.getConfig().setEndpoint(endpoint);
        }
        if (logGroupName != null) {
            appender.getConfig().setLogGroupName(logGroupName);
        }
        if (module != null) {
            appender.getConfig().setModule(module);
        }
        appender.getConfig().setMaxEventQueueSize(maxEventQueueSize);
        if (region != null) {
            appender.getConfig().setRegion(region);
        }
        if (flushDelayInMillis > 0) {
            appender.getConfig().setFlushDelayInMills(flushDelayInMillis);
        }

        return appender;
    }

    /**
     * @return The config of the appender. This instance can be modified to override defaults.
     */
    public CloudwatchLogsConfig getConfig() {
        return config;
    }

    @Override
    public void start() {
        System.out.println("CloudwatchLogsLog4J2Appender:start() called...");
        super.start();
        eventQueue = new LinkedBlockingQueue<>(config.getMaxEventQueueSize());
        putter = CloudwatchLogsLogEventPutter.create(config, eventQueue);
        new Thread(putter).start();
    }

    @Override
    public void stop() {
        System.out.println("CloudwatchLogsLog4J2Appender:stop() called...");
        putter.terminate();
        super.stop();
    }

    @Override
    protected boolean stop(Future<?> future) {
        System.out.println("CloudwatchLogsLog4J2Appender:stop(future) called...");
        putter.terminate();
        return super.stop(future);
    }

    @Override
    public boolean stop(long timeout, TimeUnit timeUnit) {
        System.out.println("CloudwatchLogsLog4J2Appender:stop(timeout, timeunit) called...");
        putter.terminate();
        System.out.println("CloudwatchLogsLog4J2Appender:stop(timeout, timeunit) Done calling terminate()... passing to super");
        return super.stop(timeout, timeUnit);
    }

    /**
     * @return The number of log events that had to be discarded because the event queue was full.
     * If this number is non zero without having been affected by AWS CloudWatch Logs availability issues,
     * you should consider increasing maxEventQueueSize in the config to allow more log events to be buffer before having to drop them.
     */
    public long getDiscardedCount() {
        return discardedCount;
    }

    @Override
    public void append(LogEvent event) {
        String message = event.getMessage().getFormattedMessage();
        Throwable thrown = event.getThrown();
        while (thrown != null) {
            message += "\n" + dump(thrown);
            thrown = thrown.getCause();
            if (thrown != null) {
                message += "\nCaused by:";
            }
        }

        Marker marker = event.getMarker();
        String eventId = marker == null ? null : marker.getName();

        CloudwatchLogsLogEvent logEvent = new CloudwatchLogsLogEvent(event.getLevel().toString(), event.getLoggerName(), eventId, message, event.getTimeMillis(), event.getThreadName());
        while (!eventQueue.offer(logEvent)) {
            eventQueue.poll();
            discardedCount++;
        }
    }

    private String dump(Throwable throwableProxy) {
        StringBuilder builder = new StringBuilder();
        builder.append(throwableProxy.getClass().getName()).append(": ").append(throwableProxy.getMessage()).append("\n");
        for (StackTraceElement step : throwableProxy.getStackTrace()) {
            String string = step.toString();
            builder.append("\t").append(string);
            builder.append(step);
            builder.append("\n");
        }
        return builder.toString();
    }
}

这是 CloudwatchLogsLogEventPutter

public class CloudwatchLogsLogEventPutter implements Runnable {
    private static int MAX_FLUSH_DELAY = 500 * 1000 * 1000;
    private static final int MAX_BATCH_COUNT = 10000;
    private static final int MAX_BATCH_SIZE = 1048576;

    private final CloudwatchLogsConfig config;
    private final BlockingQueue<CloudwatchLogsLogEvent> eventQueue;
    private final AWSLogs logsClient;
    private final ObjectMapper objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
    private final boolean enabled;
    private boolean running;
    private String module;
    private String logGroupName;
    private int batchSize;
    private long lastFlush;
    private List<InputLogEvent> eventBatch;
    private String nextSequenceToken;
    private final AtomicLong processedCount = new AtomicLong(0);

    /**
     * Creates a new EventPutter for the current AWS region.
     *
     * @param config     The config to use.
     * @param eventQueue The event queue to consume from.
     * @return The new EventPutter.
     */
    public static CloudwatchLogsLogEventPutter create(CloudwatchLogsConfig config, BlockingQueue<CloudwatchLogsLogEvent> eventQueue) {
        boolean enabled = config.getRegion() != null || config.getEndpoint() != null;
        AWSLogs logsClient = enabled ? createLogsClient(config) : null;
        CloudwatchLogsLogEventPutter logPutter = new CloudwatchLogsLogEventPutter(config, eventQueue, logsClient, enabled);
        return logPutter;
    }

    /**
     * For internal use only. This constructor lets us switch the AWSLogs implementation for testing.
     */
    public CloudwatchLogsLogEventPutter(CloudwatchLogsConfig config, BlockingQueue<CloudwatchLogsLogEvent> eventQueue,
                                        AWSLogs awsLogs, boolean enabled) {
        this.config = config;
        module = config.getModule();
        this.eventQueue = eventQueue;
        this.enabled = enabled;
        logsClient = awsLogs;

        if(config.getFlushDelayInMills() > 0) {
            MAX_FLUSH_DELAY = config.getFlushDelayInMills() * 1000;
        }

        logGroupName = config.getLogGroupName();
    }

    static AWSLogs createLogsClient(CloudwatchLogsConfig config) {
        AWSLogsClientBuilder builder = AWSLogsClientBuilder.standard();
        if (config.getEndpoint() != null) {
            // Non-AWS mock endpoint
            builder.setCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()));
            builder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(config.getEndpoint(), config.getRegion()));
        } else {
            builder.setRegion(config.getRegion());
        }
        return builder.build();
    }

    /**
     * @return The number of log events that have been processed by this putter.
     */
    public long getProcessedCount() {
        return processedCount.get();
    }

    @Override
    public void run() {
        if (!enabled && !config.isStdoutFallback()) {
            System.out.println("WARNING: AWS CloudWatch Logs appender is disabled (Unable to detect the AWS region and no CloudWatch Logs endpoint specified)");
            return;
        }

        running = true;
        nextSequenceToken = null;
        eventBatch = new ArrayList<>();
        batchSize = 0;
        lastFlush = System.nanoTime();

        printWithTimestamp(new Date(), "Initiating the while loop...");

        while (running) {
            CloudwatchLogsLogEvent event = eventQueue.poll();
            printWithTimestamp(new Date(), "Inside Loopity loop...");
            if (event != null) {
                Map<String, Object> eventMap = new TreeMap<>();
                eventMap.put("context", config.getContext());
                eventMap.put("module", config.getModule());
                eventMap.put("level", event.getLevel());
                eventMap.put("event", event.getEvent());
                eventMap.put("message", event.getMessage());
                eventMap.put("logger", event.getLogger());
                eventMap.put("thread", event.getThread());

                String eventJson;
                try {
                    eventJson = toJson(eventMap);
                } catch (JsonProcessingException e) {
                    printWithTimestamp(new Date(), "Unable to serialize log event: " + eventMap);
                    continue;
                }

                // Source: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
                // The maximum batch size is 1,048,576 bytes,
                int eventSize =
                        // and this size is calculated as the sum of all event messages in UTF-8,
                        eventJson.getBytes(StandardCharsets.UTF_8).length
                                // plus 26 bytes for each log event.
                                + 26;

                if (eventSize > MAX_BATCH_SIZE) {
                    printWithTimestamp(new Date(), "Unable to send log event as its size (" + eventSize + " bytes)"
                            + " exceeds the maximum size supported by AWS CloudWatch Logs (" + MAX_BATCH_SIZE + " bytes): " + eventMap);
                    continue;
                }

                if (config.isDebug()) {
                    printWithTimestamp(new Date(), "Event Size: " + eventSize + " bytes, Batch Size: " + batchSize
                            + " bytes, Batch Count: " + eventBatch.size() + ", Event: " + eventJson);
                }

                if ((eventBatch.size() + 1) >= MAX_BATCH_COUNT || (batchSize + eventSize) >= MAX_BATCH_SIZE) {
                    flush();
                }

                eventBatch.add(new InputLogEvent().withMessage(eventJson).withTimestamp(event.getTimestamp()));
                batchSize += eventSize;
                printWithTimestamp(new Date(event.getTimestamp()), "batchSize = " + batchSize);
            } else {
                printWithTimestamp(new Date(), "No events, just flush attempts...");
                if (!eventBatch.isEmpty() && isTimeToFlush()) {
                    printWithTimestamp(new Date(), "eventbatch is not empty and its time to flush");
                    flush();
                }
                try {
                    printWithTimestamp(new Date(), "going to sleep...");
                    Thread.sleep(100);
                    printWithTimestamp(new Date(), "done sleeping...");
                } catch (InterruptedException e) {
                    printWithTimestamp(new Date(), "Exception while flusing and sleeping...");
                    running = false;
                }
            }
        }

        printWithTimestamp(new Date(), "Done with that while loop...");
    }

    private void finalFlush() {
        printWithTimestamp(new Date(), "finalFlush() called...");
        if (!eventBatch.isEmpty()) {
            printWithTimestamp(new Date(), "finalFlush() ==> flush()...");
            flush();
            printWithTimestamp(new Date(), "finalFlush() ==> flush()... DONE");
        }
        try {
            printWithTimestamp(new Date(), "finalFlush() ==> Sleeping...");
            Thread.sleep(100);
            printWithTimestamp(new Date(), "finalFlush() ==> Sleeping... DONE");
        } catch (InterruptedException e) {
            printWithTimestamp(new Date(), "Exception while finalFlusing and sleeping... setting running to false");
            running = false;
        }
    }

    private boolean isTimeToFlush() {
        return lastFlush <= (System.nanoTime() - MAX_FLUSH_DELAY);
    }

    private void flush() {

        printWithTimestamp(new Date(),"flush() called");

        Collections.sort(eventBatch, new Comparator<InputLogEvent>() {
            @Override
            public int compare(InputLogEvent o1, InputLogEvent o2) {
                return o1.getTimestamp().compareTo(o2.getTimestamp());
            }
        });

        if (config.isStdoutFallback()) {
            for (InputLogEvent event : eventBatch) {
                printWithTimestamp(new Date(event.getTimestamp()), logGroupName + " " + module + " " + event.getMessage());
            }
        } else {
            int retries = 15;
            do {
                printWithTimestamp(new Date(),"flush() - prepping PutLogEventsRequest");
                PutLogEventsRequest request =
                        new PutLogEventsRequest(logGroupName, module, eventBatch).withSequenceToken(nextSequenceToken);
                try {
                    long start = 0;
                    if (config.isDebug()) {
                        start = System.nanoTime();
                    }
                    PutLogEventsResult result = logsClient.putLogEvents(request);
                    if (config.isDebug()) {
                        long stop = System.nanoTime();
                        long elapsed = (stop - start) / 1000000;
                        printWithTimestamp(new Date(), "Sending " + eventBatch.size() + " events took " + elapsed + " ms");
                    }
                    processedCount.addAndGet(request.getLogEvents().size());
                    nextSequenceToken = result.getNextSequenceToken();
                    break;
                } catch (DataAlreadyAcceptedException e) {
                    nextSequenceToken = e.getExpectedSequenceToken();
                    printWithTimestamp(new Date(),"flush() - received DataAlreadyAcceptedException");
                } catch (InvalidSequenceTokenException e) {
                    nextSequenceToken = e.getExpectedSequenceToken();
                    printWithTimestamp(new Date(),"flush() - received InvalidSequenceTokenException");
                } catch (ResourceNotFoundException e) {
                    printWithTimestamp(new Date(), "Unable to send logs to AWS CloudWatch Logs at "
                            + logGroupName + ">" + module + " (" + e.getErrorMessage() + "). Dropping log events batch ...");
                    break;
                } catch (SdkClientException e) {
                    try {
                        printWithTimestamp(new Date(),"flush() - received SDKClientException. Sleeping to retry");
                        Thread.sleep(1000);
                        printWithTimestamp(new Date(),"flush() - received SDKClientException. Sleeping DONE");
                    } catch (InterruptedException e1) {
                        System.out.println("SDKException while pushing logs to cloudwatch ...");
                    }
                    if (--retries > 0) {
                        printWithTimestamp(new Date(), "Attempt " + (15-retries) + "Unable to send logs to AWS CloudWatch Logs ("
                                + e.getMessage() + "). Dropping log events batch ...");
                    }
                }
            } while (retries > 0); //  && eventBatch.size() > 0
        }
        eventBatch = new ArrayList<>();
        batchSize = 0;
        lastFlush = System.nanoTime();
    }

    /* private -> for testing */
    String toJson(Map<String, Object> eventMap) throws JsonProcessingException {
        // Compensate for https://github.com/FasterXML/jackson-databind/issues/1442
        Map<String, Object> nonNullMap = new TreeMap<>();
        for (Map.Entry<String, Object> entry : eventMap.entrySet()) {
            if (entry.getValue() != null) {
                nonNullMap.put(entry.getKey(), entry.getValue());
            }
        }
        return objectMapper.writeValueAsString(nonNullMap);
    }

    private void printWithTimestamp(Date date, String str) {
        System.out.println(new SimpleDateFormat("YYYY-MM-dd HH:mm:ss.SSS").format(date) + " " + str);
    }

    public void terminate() {
        printWithTimestamp(new Date(),"terminate() ==> finalFlush()");
        //finalFlush();
        printWithTimestamp(new Date(),"terminate() ==> finalFlush() DONE. Setting running=false");
        running = false;
    }
}

CloudwatchLogsLogEvent

public class CloudwatchLogsLogEvent {
    private final String level;
    private final String logger;
    private final String event;
    private final String message;
    private final long timestamp;
    private final String thread;

    public CloudwatchLogsLogEvent(String level, String logger, String event, String message, long timestamp, String thread) {
        this.level = level;
        this.logger = logger;
        this.event = event;
        this.message = message;
        this.timestamp = timestamp;
        this.thread = thread;
    }

    public String getLevel() {
        return level;
    }

    public String getLogger() {
        return logger;
    }

    public String getEvent() {
        return event;
    }

    public String getMessage() {
        return message;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public String getThread() {
        return thread;
    }
}

最后是示例 log4j2.xml 配置

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="trace" package="com.cloudwatchlogs.appender.log4j2">
    <Appenders>
        <CloudwatchLogs-Appender name="myCloudWatchLogger">
            <region>us-west-2</region>
            <logGroupName>myCloudWatchLogGroup</logGroupName>
            <module>myCloudWatchLogStream</module>
            <flushDelayInMillis>1</flushDelayInMillis>

            <!-- Optional config parameters -->

            <!-- Whether to fall back to stdout instead of disabling the appender when running outside of a Boxfuse instance. Default: false -->
            <stdoutFallback>false</stdoutFallback>

            <!-- The maximum size of the async log event queue. Default: 1000000.
                 Increase to avoid dropping log events at very high throughput.
                 Decrease to reduce maximum memory usage at the risk if the occasional log event drop when it gets full. -->
            <maxEventQueueSize>1000000</maxEventQueueSize>
        </CloudwatchLogs-Appender>

        <Console name="console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="DEBUG">
            <AppenderRef ref="console"/>
        </Root>
        <Logger name="com.mycompany.src" level="DEBUG" additivity="false">
            <AppenderRef ref="myCloudWatchLogger" level="DEBUG"/>
        </Logger>
    </Loggers>
</Configuration>

我尝试在一个非常简单的应用程序中使用此配置 -

package ...
import ...


public class MyApp
{
    private static Logger logger = LogManager.getLogger(MyApp.class);

    AmazonS3 s3Client = null;
    AmazonDynamoDB dynamoDBClient = null;

    MyApp() {
        initS3Client(new DefaultAWSCredentialsProviderChain());
    }

    public void listObjects(String bucketName) {
        ObjectListing objectListing = s3Client.listObjects(bucketName);
        logger.info("Listing objects in bucket - " + bucketName);
        List<String> commonPrefixes = objectListing.getCommonPrefixes();
        commonPrefixes.stream().forEach(s -> System.out.println("commonPrefix - " + s));
        List<S3ObjectSummary> objectSummaries = objectListing.getObjectSummaries();
        for(S3ObjectSummary objectSummary : objectSummaries) {
            logger.info("key = " + objectSummary.getKey());
            logger.info("ETag = " + objectSummary.getETag());
            logger.info("Size = " + objectSummary.getSize());
            logger.info("Storage Class = " + objectSummary.getStorageClass());
            logger.info("Last Modified = " + objectSummary.getLastModified());
        }

        s3Client.shutdown();
    }

    public static void main(String[] args){
        MyApp myApp = new MyApp();
        myApp.listObjects("test-bucket");
    }

    void initS3Client(AWSCredentialsProvider credentialsProvider) {

        AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard()
                .withCredentials(credentialsProvider)
                .withRegion(Regions.US_WEST_2);

        s3Client = clientBuilder.build();
    }

    void initDynamoDBClient(AWSCredentialsProvider credentialsProvider) {

        AmazonDynamoDBClientBuilder clientBuilder = AmazonDynamoDBClientBuilder.standard()
                .withCredentials(credentialsProvider)
                .withRegion(Regions.US_WEST_2);

        dynamoDBClient = clientBuilder.build();
    }

}

当我 运行 MyApp.java 时,我看到在所有相关日志都流式传输到 CloudWatch 之后,CloudwatchLogsLogEventPutter.java 的 运行 中的 while 循环( ) 方法不会终止。我知道这是一个单独的线程,永远 运行ning,但是一旦 MyApp.main() 方法中的应用程序相关任务是,log4j2 不应该在生命周期中自行启动 stop() 方法完全的?

如果我尝试执行 Ctrl+C,我会看到下面被调用的 CloudwatchLogsLog4J2Appender.java 中覆盖的 stop() 方法 -

public boolean stop(long timeout, TimeUnit timeUnit)

我不确定哪里出错了,而且关于处理 Appender 的各种生命周期方法和生命周期本身的文档似乎很少。这是我第一次写 appender。任何帮助表示赞赏。谢谢。

更新 1: 示例日志文件 - https://gist.github.com/dev-usa/822309bcd8b4f8a5fb0f4e1eca70d67e

所以,我已经解决了这个实现的几个问题。

  1. 要正常关闭记录器 - LogManager.shutdown() 需要使用附加程序从应用程序中调用。在发布此问题时,应用程序未使用关闭方法。使用它就像我在上面解释的 Ctrl+C 行为。完成后,我看到日志记录框架正在按预期关闭并且 appender 已关闭。

  2. 我面临的下一个问题是我丢失了一些未发送到 CloudWatch 的日志。经过一些调试,我发现CloudwatchLogsLogEventPutter.java的运行()方法中while循环的实现在运行()循环中一次只获取1条日志。我更新了设计以在 BlockingQueue 上使用 drainTo 方法来获取整个事件列表并一次性推送它们。这大大减少了将事件推送到 CloudWatch 的 while 循环次数。请参阅下面的更新实施 -

    while (running) {
    
        List<CloudwatchLogsLogEvent> logEvents = new ArrayList<>();
        eventQueue.drainTo(logEvents);
    
        printWithTimestamp( "Draining events from eventLoop. No. of events received = " + logEvents.size());
    
        if(logEvents.size() > 0) {
            printWithTimestamp( "Translating " + logEvents.size() + " log events to CloudWatch Logs...");
            logEvents.stream().forEach(logEvent -> translateLogEventToCloudWatchLog(logEvent));
            printWithTimestamp( "Translating " + logEvents.size() + " log events to CloudWatch Logs... DONE");
        }
    
        boolean timeToFlush = isTimeToFlush();
        printWithTimestamp( "isTimeToFlush()   = " + timeToFlush);
        printWithTimestamp( "eventBatch.size() = " + eventBatch.size());
    
        if (!eventBatch.isEmpty() && timeToFlush) {
            printWithTimestamp( "Event Batch is NOT empty and it's time to flush");
            flush();
        }
    
        try {
            printWithTimestamp( "going to sleep...");
            Thread.sleep(100);
            printWithTimestamp( "done sleeping...");
        } catch (InterruptedException e) {
            printWithTimestamp( "Exception while flushing and sleeping...");
            running = false;
        }
    }
    
  3. 最后,我在将我的应用程序打包为一个fat jar时,也遇到了log4j2配置和类路径中无法识别appender的问题,采用建议的解决方案解决了我的问题。

祝你好运!