From ae42ddd35fe1e7ee4a712c0cf97846b362e78564 Mon Sep 17 00:00:00 2001 From: karl-sy <71377602+karl-sy@users.noreply.github.com> Date: Mon, 26 Jun 2023 13:42:50 +0800 Subject: [PATCH] Make default JUL-based logging asynchronous (#3136) * fix issues/1463 : Make default JUL-based logging asynchronous --- .../csp/sentinel/log/jul/ConsoleHandler.java | 89 +++++++++++++++++-- .../sentinel/log/jul/DateFileLogHandler.java | 69 +++++++++++++- .../sentinel/log/jul/ConsoleHandlerTest.java | 73 ++++++++++++++- 3 files changed, 222 insertions(+), 9 deletions(-) diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/log/jul/ConsoleHandler.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/log/jul/ConsoleHandler.java index 589b0949..b74def65 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/log/jul/ConsoleHandler.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/log/jul/ConsoleHandler.java @@ -15,7 +15,16 @@ */ package com.alibaba.csp.sentinel.log.jul; +import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; + import java.io.UnsupportedEncodingException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.logging.*; /** @@ -43,9 +52,21 @@ class ConsoleHandler extends Handler { */ private StreamHandler stderrHandler; + private ExecutorService executor; + public ConsoleHandler() { this.stdoutHandler = new StreamHandler(System.out, new CspFormatter()); this.stderrHandler = new StreamHandler(System.err, new CspFormatter()); + + int corePoolSize = 1; + int maximumPoolSize = 1; + long keepAliveTime = 0; + /**insure the log can be recorded*/ + int queueSize = 1024; + RejectedExecutionHandler handler = new LogRejectedExecutionHandler(); + executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize), + new NamedThreadFactory("sentinel-console-log-executor", true), handler); } @Override @@ -62,13 +83,7 @@ class ConsoleHandler extends Handler { @Override public void publish(LogRecord record) { - if (record.getLevel().intValue() >= Level.WARNING.intValue()) { - stderrHandler.publish(record); - stderrHandler.flush(); - } else { - stdoutHandler.publish(record); - stdoutHandler.flush(); - } + executor.execute(new LogTask(record,stdoutHandler,stderrHandler)); } @Override @@ -79,7 +94,67 @@ class ConsoleHandler extends Handler { @Override public void close() throws SecurityException { + /**not need to record log if process is killed.*/ + executor.shutdown(); stdoutHandler.close(); stderrHandler.close(); } + + public ExecutorService getExecutor() { + return executor; + } + + static class LogRejectedExecutionHandler implements RejectedExecutionHandler { + + /** + * The period of logged rejected records. + */ + private final long recordPeriod; + + private Long lastRecordTime; + + public LogRejectedExecutionHandler() { + String DEFAULT_REJECTED_RECORD_PERIOD = "60000"; + String REJECTED_RECORD_PERIOD_KEY = "sentinel.rejected.record.period"; + lastRecordTime = null; + recordPeriod = Long.parseLong(System.getProperty(REJECTED_RECORD_PERIOD_KEY, DEFAULT_REJECTED_RECORD_PERIOD)); + } + + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + long currentTimestamp = System.currentTimeMillis(); + if (lastRecordTime == null || currentTimestamp - lastRecordTime > recordPeriod) { + System.err.println("Failed to log sentinel record with console, rejected"); + lastRecordTime = currentTimestamp; + } + } + + } + + static class LogTask implements Runnable { + private final LogRecord record; + private final StreamHandler stdoutHandler; + private final StreamHandler stderrHandler; + + public LogTask(LogRecord record,StreamHandler stdoutHandler,StreamHandler stderrHandler) { + this.record = record; + this.stdoutHandler = stdoutHandler; + this.stderrHandler = stderrHandler; + } + + public void run() { + if (record.getLevel().intValue() >= Level.WARNING.intValue()) { + stderrHandler.publish(record); + stderrHandler.flush(); + } else { + stdoutHandler.publish(record); + stdoutHandler.flush(); + } + } + + public LogRecord getRecord() { + return record; + } + + } + } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/log/jul/DateFileLogHandler.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/log/jul/DateFileLogHandler.java index 30a5ebee..0fb13a4f 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/log/jul/DateFileLogHandler.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/log/jul/DateFileLogHandler.java @@ -15,11 +15,20 @@ */ package com.alibaba.csp.sentinel.log.jul; +import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; + import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.ArrayDeque; import java.util.Calendar; import java.util.Date; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.TimeUnit; import java.util.logging.FileHandler; import java.util.logging.Formatter; import java.util.logging.Handler; @@ -34,6 +43,8 @@ class DateFileLogHandler extends Handler { } }; + private ExecutorService executor; + private volatile FileHandler handler; private final String pattern; @@ -55,10 +66,22 @@ class DateFileLogHandler extends Handler { this.append = append; rotateDate(); this.initialized = true; + + int corePoolSize = 1; + int maximumPoolSize = 1; + long keepAliveTime = 0; + /**insure the log can be recorded*/ + int queueSize = 1024; + RejectedExecutionHandler handler = new LogRejectedExecutionHandler(); + executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize), + new NamedThreadFactory("sentinel-datafile-log-executor", true), handler); } @Override public void close() throws SecurityException { + /**not need to record log if process is killed.*/ + executor.shutdown(); handler.close(); } @@ -80,7 +103,8 @@ class DateFileLogHandler extends Handler { String msg = record.getMessage(); record.setMessage("missed file rolling at: " + new Date(endDate) + "\n" + msg); } - handler.publish(record); + + executor.execute(new LogTask(record,handler)); } private boolean shouldRotate(LogRecord record) { @@ -145,4 +169,47 @@ class DateFileLogHandler extends Handler { } } + static class LogRejectedExecutionHandler implements RejectedExecutionHandler { + /** + * The period of logged rejected records. + */ + private final long recordPeriod; + + private Long lastRecordTime; + + public LogRejectedExecutionHandler() { + String DEFAULT_REJECTED_RECORD_PERIOD = "60000"; + String REJECTED_RECORD_PERIOD_KEY = "sentinel.rejected.record.period"; + lastRecordTime = null; + recordPeriod = Long.parseLong(System.getProperty(REJECTED_RECORD_PERIOD_KEY, DEFAULT_REJECTED_RECORD_PERIOD)); + } + + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + long currentTimestamp = System.currentTimeMillis(); + if (lastRecordTime == null || currentTimestamp - lastRecordTime > recordPeriod) { + System.err.println("Failed to log sentinel record with datafile, rejected"); + lastRecordTime = currentTimestamp; + } + } + } + + static class LogTask implements Runnable { + private final LogRecord record; + private final FileHandler handler; + + public LogTask(LogRecord record,FileHandler handler) { + this.record = record; + this.handler = handler; + } + + public void run() { + handler.publish(record); + } + + public LogRecord getRecord() { + return record; + } + + } + } diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/log/jul/ConsoleHandlerTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/log/jul/ConsoleHandlerTest.java index b30d9b2a..4783a058 100644 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/log/jul/ConsoleHandlerTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/log/jul/ConsoleHandlerTest.java @@ -19,6 +19,7 @@ import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.LogRecord; @@ -35,7 +36,7 @@ import static org.junit.Assert.*; public class ConsoleHandlerTest { @Test - public void testPublish() { + public void testInfoPublish() { ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); System.setOut(new PrintStream(baosOut)); @@ -50,35 +51,105 @@ public class ConsoleHandlerTest { // Test INFO level, should log to stdout logRecord = new LogRecord(Level.INFO, "test info message"); consoleHandler.publish(logRecord); + try { + consoleHandler.getExecutor().shutdown(); + consoleHandler.getExecutor().awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + consoleHandler.close(); assertEquals(cspFormatter.format(logRecord), baosOut.toString()); assertEquals("", baosErr.toString()); baosOut.reset(); baosErr.reset(); + } + + @Test + public void testWarnPublish() { + ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baosOut)); + + ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); + System.setErr(new PrintStream(baosErr)); + + CspFormatter cspFormatter = new CspFormatter(); + ConsoleHandler consoleHandler = new ConsoleHandler(); + + LogRecord logRecord; // Test INFO level, should log to stderr logRecord = new LogRecord(Level.WARNING, "test warning message"); consoleHandler.publish(logRecord); + try { + consoleHandler.getExecutor().shutdown(); + consoleHandler.getExecutor().awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + consoleHandler.close(); assertEquals(cspFormatter.format(logRecord), baosErr.toString()); assertEquals("", baosOut.toString()); baosOut.reset(); baosErr.reset(); + } + + @Test + public void testFinePublish() { + ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baosOut)); + + ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); + System.setErr(new PrintStream(baosErr)); + + CspFormatter cspFormatter = new CspFormatter(); + ConsoleHandler consoleHandler = new ConsoleHandler(); + + LogRecord logRecord; // Test FINE level, no log by default // Default log level is INFO, to log FINE message to stdout, add following config in $JAVA_HOME/jre/lib/logging.properties // java.util.logging.StreamHandler.level=FINE logRecord = new LogRecord(Level.FINE, "test fine message"); consoleHandler.publish(logRecord); + try { + consoleHandler.getExecutor().shutdown(); + consoleHandler.getExecutor().awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + consoleHandler.close(); assertEquals("", baosOut.toString()); assertEquals("", baosErr.toString()); baosOut.reset(); baosErr.reset(); + } + @Test + public void testSeverePublish() { + ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baosOut)); + + ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); + System.setErr(new PrintStream(baosErr)); + + CspFormatter cspFormatter = new CspFormatter(); + ConsoleHandler consoleHandler = new ConsoleHandler(); + + LogRecord logRecord; // Test SEVERE level, should log to stderr logRecord = new LogRecord(Level.SEVERE, "test severe message"); consoleHandler.publish(logRecord); + try { + consoleHandler.getExecutor().shutdown(); + consoleHandler.getExecutor().awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + consoleHandler.close(); assertEquals(cspFormatter.format(logRecord), baosErr.toString()); assertEquals("", baosOut.toString()); baosOut.reset(); baosErr.reset(); } + }