Make default JUL-based logging asynchronous (#3136)

* fix issues/1463 : Make default JUL-based logging asynchronous
This commit is contained in:
karl-sy 2023-06-26 13:42:50 +08:00 committed by LearningGp
parent c4997eee15
commit ae42ddd35f
3 changed files with 222 additions and 9 deletions

View File

@ -15,7 +15,16 @@
*/
package com.alibaba.csp.sentinel.log.jul;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import java.io.UnsupportedEncodingException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.*;
/**
@ -43,9 +52,21 @@ class ConsoleHandler extends Handler {
*/
private StreamHandler stderrHandler;
private ExecutorService executor;
public ConsoleHandler() {
this.stdoutHandler = new StreamHandler(System.out, new CspFormatter());
this.stderrHandler = new StreamHandler(System.err, new CspFormatter());
int corePoolSize = 1;
int maximumPoolSize = 1;
long keepAliveTime = 0;
/**insure the log can be recorded*/
int queueSize = 1024;
RejectedExecutionHandler handler = new LogRejectedExecutionHandler();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-console-log-executor", true), handler);
}
@Override
@ -62,13 +83,7 @@ class ConsoleHandler extends Handler {
@Override
public void publish(LogRecord record) {
if (record.getLevel().intValue() >= Level.WARNING.intValue()) {
stderrHandler.publish(record);
stderrHandler.flush();
} else {
stdoutHandler.publish(record);
stdoutHandler.flush();
}
executor.execute(new LogTask(record,stdoutHandler,stderrHandler));
}
@Override
@ -79,7 +94,67 @@ class ConsoleHandler extends Handler {
@Override
public void close() throws SecurityException {
/**not need to record log if process is killed.*/
executor.shutdown();
stdoutHandler.close();
stderrHandler.close();
}
public ExecutorService getExecutor() {
return executor;
}
static class LogRejectedExecutionHandler implements RejectedExecutionHandler {
/**
* The period of logged rejected records.
*/
private final long recordPeriod;
private Long lastRecordTime;
public LogRejectedExecutionHandler() {
String DEFAULT_REJECTED_RECORD_PERIOD = "60000";
String REJECTED_RECORD_PERIOD_KEY = "sentinel.rejected.record.period";
lastRecordTime = null;
recordPeriod = Long.parseLong(System.getProperty(REJECTED_RECORD_PERIOD_KEY, DEFAULT_REJECTED_RECORD_PERIOD));
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
long currentTimestamp = System.currentTimeMillis();
if (lastRecordTime == null || currentTimestamp - lastRecordTime > recordPeriod) {
System.err.println("Failed to log sentinel record with console, rejected");
lastRecordTime = currentTimestamp;
}
}
}
static class LogTask implements Runnable {
private final LogRecord record;
private final StreamHandler stdoutHandler;
private final StreamHandler stderrHandler;
public LogTask(LogRecord record,StreamHandler stdoutHandler,StreamHandler stderrHandler) {
this.record = record;
this.stdoutHandler = stdoutHandler;
this.stderrHandler = stderrHandler;
}
public void run() {
if (record.getLevel().intValue() >= Level.WARNING.intValue()) {
stderrHandler.publish(record);
stderrHandler.flush();
} else {
stdoutHandler.publish(record);
stdoutHandler.flush();
}
}
public LogRecord getRecord() {
return record;
}
}
}

View File

@ -15,11 +15,20 @@
*/
package com.alibaba.csp.sentinel.log.jul;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayDeque;
import java.util.Calendar;
import java.util.Date;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import java.util.logging.FileHandler;
import java.util.logging.Formatter;
import java.util.logging.Handler;
@ -34,6 +43,8 @@ class DateFileLogHandler extends Handler {
}
};
private ExecutorService executor;
private volatile FileHandler handler;
private final String pattern;
@ -55,10 +66,22 @@ class DateFileLogHandler extends Handler {
this.append = append;
rotateDate();
this.initialized = true;
int corePoolSize = 1;
int maximumPoolSize = 1;
long keepAliveTime = 0;
/**insure the log can be recorded*/
int queueSize = 1024;
RejectedExecutionHandler handler = new LogRejectedExecutionHandler();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-datafile-log-executor", true), handler);
}
@Override
public void close() throws SecurityException {
/**not need to record log if process is killed.*/
executor.shutdown();
handler.close();
}
@ -80,7 +103,8 @@ class DateFileLogHandler extends Handler {
String msg = record.getMessage();
record.setMessage("missed file rolling at: " + new Date(endDate) + "\n" + msg);
}
handler.publish(record);
executor.execute(new LogTask(record,handler));
}
private boolean shouldRotate(LogRecord record) {
@ -145,4 +169,47 @@ class DateFileLogHandler extends Handler {
}
}
static class LogRejectedExecutionHandler implements RejectedExecutionHandler {
/**
* The period of logged rejected records.
*/
private final long recordPeriod;
private Long lastRecordTime;
public LogRejectedExecutionHandler() {
String DEFAULT_REJECTED_RECORD_PERIOD = "60000";
String REJECTED_RECORD_PERIOD_KEY = "sentinel.rejected.record.period";
lastRecordTime = null;
recordPeriod = Long.parseLong(System.getProperty(REJECTED_RECORD_PERIOD_KEY, DEFAULT_REJECTED_RECORD_PERIOD));
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
long currentTimestamp = System.currentTimeMillis();
if (lastRecordTime == null || currentTimestamp - lastRecordTime > recordPeriod) {
System.err.println("Failed to log sentinel record with datafile, rejected");
lastRecordTime = currentTimestamp;
}
}
}
static class LogTask implements Runnable {
private final LogRecord record;
private final FileHandler handler;
public LogTask(LogRecord record,FileHandler handler) {
this.record = record;
this.handler = handler;
}
public void run() {
handler.publish(record);
}
public LogRecord getRecord() {
return record;
}
}
}

View File

@ -19,6 +19,7 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.LogRecord;
@ -35,7 +36,7 @@ import static org.junit.Assert.*;
public class ConsoleHandlerTest {
@Test
public void testPublish() {
public void testInfoPublish() {
ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(baosOut));
@ -50,35 +51,105 @@ public class ConsoleHandlerTest {
// Test INFO level, should log to stdout
logRecord = new LogRecord(Level.INFO, "test info message");
consoleHandler.publish(logRecord);
try {
consoleHandler.getExecutor().shutdown();
consoleHandler.getExecutor().awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
consoleHandler.close();
assertEquals(cspFormatter.format(logRecord), baosOut.toString());
assertEquals("", baosErr.toString());
baosOut.reset();
baosErr.reset();
}
@Test
public void testWarnPublish() {
ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(baosOut));
ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
System.setErr(new PrintStream(baosErr));
CspFormatter cspFormatter = new CspFormatter();
ConsoleHandler consoleHandler = new ConsoleHandler();
LogRecord logRecord;
// Test INFO level, should log to stderr
logRecord = new LogRecord(Level.WARNING, "test warning message");
consoleHandler.publish(logRecord);
try {
consoleHandler.getExecutor().shutdown();
consoleHandler.getExecutor().awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
consoleHandler.close();
assertEquals(cspFormatter.format(logRecord), baosErr.toString());
assertEquals("", baosOut.toString());
baosOut.reset();
baosErr.reset();
}
@Test
public void testFinePublish() {
ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(baosOut));
ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
System.setErr(new PrintStream(baosErr));
CspFormatter cspFormatter = new CspFormatter();
ConsoleHandler consoleHandler = new ConsoleHandler();
LogRecord logRecord;
// Test FINE level, no log by default
// Default log level is INFO, to log FINE message to stdout, add following config in $JAVA_HOME/jre/lib/logging.properties
// java.util.logging.StreamHandler.level=FINE
logRecord = new LogRecord(Level.FINE, "test fine message");
consoleHandler.publish(logRecord);
try {
consoleHandler.getExecutor().shutdown();
consoleHandler.getExecutor().awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
consoleHandler.close();
assertEquals("", baosOut.toString());
assertEquals("", baosErr.toString());
baosOut.reset();
baosErr.reset();
}
@Test
public void testSeverePublish() {
ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(baosOut));
ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
System.setErr(new PrintStream(baosErr));
CspFormatter cspFormatter = new CspFormatter();
ConsoleHandler consoleHandler = new ConsoleHandler();
LogRecord logRecord;
// Test SEVERE level, should log to stderr
logRecord = new LogRecord(Level.SEVERE, "test severe message");
consoleHandler.publish(logRecord);
try {
consoleHandler.getExecutor().shutdown();
consoleHandler.getExecutor().awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
consoleHandler.close();
assertEquals(cspFormatter.format(logRecord), baosErr.toString());
assertEquals("", baosOut.toString());
baosOut.reset();
baosErr.reset();
}
}