Add default circuit breaker rule support (#2232)
This commit is contained in:
parent
6a8a01a8c1
commit
8b43caede4
|
|
@ -0,0 +1,134 @@
|
|||
package com.alibaba.csp.sentinel.slots.block.degrade;
|
||||
|
||||
import com.alibaba.csp.sentinel.log.RecordLog;
|
||||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
|
||||
import com.alibaba.csp.sentinel.property.PropertyListener;
|
||||
import com.alibaba.csp.sentinel.property.SentinelProperty;
|
||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
|
||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
|
||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker;
|
||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ResponseTimeCircuitBreaker;
|
||||
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author wuwen
|
||||
*/
|
||||
public class DefaultDegradeRuleManager {
|
||||
|
||||
public static final String DEFAULT_KEY = "*";
|
||||
|
||||
private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new ConcurrentHashMap<>();
|
||||
private static volatile Set<DegradeRule> rules = new HashSet<>();
|
||||
|
||||
private static final DefaultDegradeRuleManager.RulePropertyListener LISTENER = new DefaultDegradeRuleManager.RulePropertyListener();
|
||||
private static SentinelProperty<List<DegradeRule>> currentProperty
|
||||
= new DynamicSentinelProperty<>();
|
||||
|
||||
static {
|
||||
currentProperty.addListener(LISTENER);
|
||||
}
|
||||
|
||||
|
||||
static List<CircuitBreaker> getDefaultCircuitBreakers(String resourceName) {
|
||||
List<CircuitBreaker> circuitBreakers = DefaultDegradeRuleManager.circuitBreakers.get(resourceName);
|
||||
if (circuitBreakers == null && !rules.isEmpty()) {
|
||||
return DefaultDegradeRuleManager.circuitBreakers.computeIfAbsent(resourceName,
|
||||
r -> rules.stream().map(DefaultDegradeRuleManager::newCircuitBreakerFrom).collect(Collectors.toList()));
|
||||
}
|
||||
return circuitBreakers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load {@link DegradeRule}s, former rules will be replaced.
|
||||
*
|
||||
* @param rules new rules to load.
|
||||
*/
|
||||
public static void loadRules(List<DegradeRule> rules) {
|
||||
try {
|
||||
currentProperty.updateValue(rules);
|
||||
} catch (Throwable e) {
|
||||
RecordLog.error("[DefaultDegradeRuleManager] Unexpected error when loading degrade rules", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isValidDefaultRule(DegradeRule rule) {
|
||||
if (!DegradeRuleManager.isValidRule(rule)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return rule.getResource().equals(DEFAULT_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a circuit breaker instance from provided circuit breaking rule.
|
||||
*
|
||||
* @param rule a valid circuit breaking rule
|
||||
* @return new circuit breaker based on provided rule; null if rule is invalid or unsupported type
|
||||
*/
|
||||
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
|
||||
switch (rule.getGrade()) {
|
||||
case RuleConstant.DEGRADE_GRADE_RT:
|
||||
return new ResponseTimeCircuitBreaker(rule);
|
||||
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
|
||||
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
|
||||
return new ExceptionCircuitBreaker(rule);
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {
|
||||
|
||||
private synchronized void reloadFrom(List<DegradeRule> list) {
|
||||
|
||||
if (list == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Set<DegradeRule> rules = new HashSet<>();
|
||||
List<CircuitBreaker> cbs = new ArrayList<>();
|
||||
|
||||
for (DegradeRule rule : list) {
|
||||
if (!isValidDefaultRule(rule)) {
|
||||
RecordLog.warn("[DefaultDegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
|
||||
} else {
|
||||
|
||||
if (StringUtil.isBlank(rule.getLimitApp())) {
|
||||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
|
||||
}
|
||||
CircuitBreaker cb = newCircuitBreakerFrom(rule);
|
||||
cbs.add(cb);
|
||||
rules.add(rule);
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, List<CircuitBreaker>> cbMap = new ConcurrentHashMap<>(8);
|
||||
|
||||
DefaultDegradeRuleManager.circuitBreakers.forEach((k, v) -> cbMap.put(k, cbs));
|
||||
|
||||
DefaultDegradeRuleManager.rules = rules;
|
||||
DefaultDegradeRuleManager.circuitBreakers = cbMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configUpdate(List<DegradeRule> conf) {
|
||||
reloadFrom(conf);
|
||||
RecordLog.info("[DefaultDegradeRuleManager] Degrade rules has been updated to: {}", rules);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configLoad(List<DegradeRule> conf) {
|
||||
reloadFrom(conf);
|
||||
RecordLog.info("[DefaultDegradeRuleManager] Degrade rules loaded: {}", rules);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
package com.alibaba.csp.sentinel.slots.block.degrade;
|
||||
|
||||
import com.alibaba.csp.sentinel.Constants;
|
||||
import com.alibaba.csp.sentinel.Entry;
|
||||
import com.alibaba.csp.sentinel.context.Context;
|
||||
import com.alibaba.csp.sentinel.node.DefaultNode;
|
||||
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
|
||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
|
||||
import com.alibaba.csp.sentinel.slots.block.BlockException;
|
||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
|
||||
import com.alibaba.csp.sentinel.spi.Spi;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author wuwen
|
||||
*/
|
||||
@Spi(order = Constants.ORDER_DEGRADE_SLOT + 100)
|
||||
public class DefaultDegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
|
||||
|
||||
@Override
|
||||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
|
||||
boolean prioritized, Object... args) throws Throwable {
|
||||
performChecking(context, resourceWrapper);
|
||||
|
||||
fireEntry(context, resourceWrapper, node, count, prioritized, args);
|
||||
}
|
||||
|
||||
private void performChecking(Context context, ResourceWrapper r) throws BlockException {
|
||||
|
||||
if (DegradeRuleManager.hasConfig(r.getName())) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<CircuitBreaker> circuitBreakers = DefaultDegradeRuleManager.getDefaultCircuitBreakers(r.getName());
|
||||
|
||||
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (CircuitBreaker cb : circuitBreakers) {
|
||||
if (!cb.tryPass(context)) {
|
||||
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
|
||||
Entry curEntry = context.getCurEntry();
|
||||
if (curEntry.getBlockError() != null) {
|
||||
fireExit(context, r, count, args);
|
||||
return;
|
||||
}
|
||||
|
||||
if (DegradeRuleManager.hasConfig(r.getName())) {
|
||||
fireExit(context, r, count, args);
|
||||
return;
|
||||
}
|
||||
|
||||
List<CircuitBreaker> circuitBreakers = DefaultDegradeRuleManager.getDefaultCircuitBreakers(r.getName());
|
||||
|
||||
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (curEntry.getBlockError() == null) {
|
||||
// passed request
|
||||
for (CircuitBreaker circuitBreaker : circuitBreakers) {
|
||||
circuitBreaker.onRequestComplete(context);
|
||||
}
|
||||
}
|
||||
|
||||
fireExit(context, r, count, args);
|
||||
}
|
||||
}
|
||||
|
|
@ -6,4 +6,5 @@ com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
|
|||
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
|
||||
com.alibaba.csp.sentinel.slots.system.SystemSlot
|
||||
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
|
||||
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
|
||||
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
|
||||
com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot
|
||||
|
|
@ -18,6 +18,7 @@ package com.alibaba.csp.sentinel.slots;
|
|||
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
|
||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain;
|
||||
import com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot;
|
||||
import com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot;
|
||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot;
|
||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowSlot;
|
||||
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
|
||||
|
|
@ -70,6 +71,9 @@ public class DefaultSlotChainBuilderTest {
|
|||
next = next.getNext();
|
||||
assertTrue(next instanceof DegradeSlot);
|
||||
|
||||
next = next.getNext();
|
||||
assertTrue(next instanceof DefaultDegradeSlot);
|
||||
|
||||
next = next.getNext();
|
||||
assertNull(next);
|
||||
|
||||
|
|
|
|||
|
|
@ -44,16 +44,17 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest {
|
|||
|
||||
@Before
|
||||
public void setUp() {
|
||||
DegradeRuleManager.loadRules(new ArrayList<DegradeRule>());
|
||||
DegradeRuleManager.loadRules(new ArrayList<>());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
DegradeRuleManager.loadRules(new ArrayList<DegradeRule>());
|
||||
public void tearDown() {
|
||||
DegradeRuleManager.loadRules(new ArrayList<>());
|
||||
DefaultDegradeRuleManager.loadRules(new ArrayList<>());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowRequestMode() throws Exception {
|
||||
public void testSlowRequestMode() {
|
||||
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
|
||||
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
|
||||
int retryTimeoutSec = 5;
|
||||
|
|
@ -115,7 +116,69 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionRatioMode() throws Exception {
|
||||
public void testSlowRequestModeUseDefaultRule() {
|
||||
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
|
||||
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
|
||||
int retryTimeoutSec = 5;
|
||||
int maxRt = 50;
|
||||
int statIntervalMs = 20000;
|
||||
int minRequestAmount = 10;
|
||||
String res = "CircuitBreakingIntegrationTest_testSlowRequestModeUseDefaultRule";
|
||||
EventObserverRegistry.getInstance().addStateChangeObserver(res, observer);
|
||||
|
||||
DefaultDegradeRuleManager.loadRules(Arrays.asList(
|
||||
new DegradeRule(DefaultDegradeRuleManager.DEFAULT_KEY).setTimeWindow(retryTimeoutSec).setCount(maxRt)
|
||||
.setStatIntervalMs(statIntervalMs).setMinRequestAmount(minRequestAmount)
|
||||
.setSlowRatioThreshold(0.8d).setGrade(0)));
|
||||
|
||||
// Try first N requests where N = minRequestAmount.
|
||||
for (int i = 0; i < minRequestAmount; i++) {
|
||||
if (i < 7) {
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
} else {
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(-20, -10)));
|
||||
}
|
||||
}
|
||||
|
||||
// Till now slow ratio should be 70%.
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
// Circuit breaker has transformed to OPEN since here.
|
||||
verify(observer)
|
||||
.onStateChange(eq(State.CLOSED), eq(State.OPEN), any(DegradeRule.class), anyDouble());
|
||||
assertEquals(State.OPEN, DefaultDegradeRuleManager.getDefaultCircuitBreakers(res).get(0).currentState());
|
||||
assertFalse(entryAndSleepFor(res, 1));
|
||||
|
||||
sleepSecond(1);
|
||||
assertFalse(entryAndSleepFor(res, 1));
|
||||
sleepSecond(retryTimeoutSec);
|
||||
// Test HALF-OPEN to OPEN.
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
|
||||
verify(observer)
|
||||
.onStateChange(eq(State.OPEN), eq(State.HALF_OPEN), any(DegradeRule.class), nullable(Double.class));
|
||||
verify(observer)
|
||||
.onStateChange(eq(State.HALF_OPEN), eq(State.OPEN), any(DegradeRule.class), anyDouble());
|
||||
// Wait for next retry timeout;
|
||||
reset(observer);
|
||||
sleepSecond(retryTimeoutSec + 1);
|
||||
assertTrue(entryAndSleepFor(res, maxRt - ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
verify(observer)
|
||||
.onStateChange(eq(State.OPEN), eq(State.HALF_OPEN), any(DegradeRule.class), nullable(Double.class));
|
||||
verify(observer)
|
||||
.onStateChange(eq(State.HALF_OPEN), eq(State.CLOSED), any(DegradeRule.class), nullable(Double.class));
|
||||
// Now circuit breaker has been closed.
|
||||
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
|
||||
|
||||
EventObserverRegistry.getInstance().removeStateChangeObserver(res);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionRatioMode() {
|
||||
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
|
||||
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
|
||||
int retryTimeoutSec = 5;
|
||||
|
|
@ -169,7 +232,7 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionCountMode() throws Throwable {
|
||||
public void testExceptionCountMode() {
|
||||
// TODO
|
||||
}
|
||||
|
||||
|
|
@ -188,7 +251,7 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleHalfOpenedBreakers() throws Exception {
|
||||
public void testMultipleHalfOpenedBreakers() {
|
||||
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
|
||||
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
|
||||
int retryTimeoutSec = 2;
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
|
|||
import com.alibaba.csp.sentinel.slotchain.SlotChainBuilder;
|
||||
import com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder;
|
||||
import com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot;
|
||||
import com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot;
|
||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot;
|
||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowSlot;
|
||||
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
|
||||
|
|
@ -98,13 +99,14 @@ public class SpiLoaderTest {
|
|||
prototypeSlotClasses.add(NodeSelectorSlot.class);
|
||||
prototypeSlotClasses.add(ClusterBuilderSlot.class);
|
||||
|
||||
List<Class<? extends ProcessorSlot>> singletonSlotClasses = new ArrayList<>(6);
|
||||
List<Class<? extends ProcessorSlot>> singletonSlotClasses = new ArrayList<>(7);
|
||||
singletonSlotClasses.add(LogSlot.class);
|
||||
singletonSlotClasses.add(StatisticSlot.class);
|
||||
singletonSlotClasses.add(AuthoritySlot.class);
|
||||
singletonSlotClasses.add(SystemSlot.class);
|
||||
singletonSlotClasses.add(FlowSlot.class);
|
||||
singletonSlotClasses.add(DegradeSlot.class);
|
||||
singletonSlotClasses.add(DefaultDegradeSlot.class);
|
||||
|
||||
for (int i = 0; i < slots1.size(); i++) {
|
||||
ProcessorSlot slot1 = slots1.get(i);
|
||||
|
|
@ -148,7 +150,7 @@ public class SpiLoaderTest {
|
|||
assertNotNull(sortedSlots);
|
||||
|
||||
// Total 8 default slot in sentinel-core
|
||||
assertEquals(8, sortedSlots.size());
|
||||
assertEquals(9, sortedSlots.size());
|
||||
|
||||
// Verify the order of slot
|
||||
int index = 0;
|
||||
|
|
@ -160,6 +162,7 @@ public class SpiLoaderTest {
|
|||
assertTrue(sortedSlots.get(index++) instanceof SystemSlot);
|
||||
assertTrue(sortedSlots.get(index++) instanceof FlowSlot);
|
||||
assertTrue(sortedSlots.get(index++) instanceof DegradeSlot);
|
||||
assertTrue(sortedSlots.get(index++) instanceof DefaultDegradeSlot);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -177,7 +180,7 @@ public class SpiLoaderTest {
|
|||
assertNotNull(slot);
|
||||
|
||||
// NodeSelectorSlot is lowest order priority with @Spi(order = -1000) among all slots
|
||||
assertTrue(slot instanceof DegradeSlot);
|
||||
assertTrue(slot instanceof DefaultDegradeSlot);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in New Issue