From d59beaec66ae40a77cda11f917a7bee795082d6e Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Tue, 18 Jun 2019 13:39:37 +0800 Subject: [PATCH] Fix the bug that numeric overflow might occur when refilling tokens in ParamFlowChecker (#838) - use AtomicLong to replace AtomicInteger Signed-off-by: Eric Zhao --- .../block/flow/param/ParamFlowChecker.java | 20 ++++---- .../block/flow/param/ParameterMetric.java | 8 ++-- .../flow/param/ParamFlowCheckerTest.java | 2 +- .../param/ParamFlowDefaultCheckerTest.java | 48 +++++++++++++++++-- .../block/flow/param/ParamFlowSlotTest.java | 4 +- 5 files changed, 61 insertions(+), 21 deletions(-) diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java index aa4598f8..055c0560 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java @@ -121,7 +121,7 @@ public final class ParamFlowChecker { static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) { ParameterMetric metric = getParameterMetric(resourceWrapper); - CacheMap tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule); + CacheMap tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule); CacheMap timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule); if (tokenCounters == null || timeCounters == null) { @@ -130,7 +130,7 @@ public final class ParamFlowChecker { // Calculate max token count (threshold) Set exclusionItems = rule.getParsedHotItems().keySet(); - int tokenCount = (int)rule.getCount(); + long tokenCount = (long)rule.getCount(); if (exclusionItems.contains(value)) { tokenCount = rule.getParsedHotItems().get(value); } @@ -139,7 +139,7 @@ public final class ParamFlowChecker { return false; } - int maxCount = tokenCount + rule.getBurstCount(); + long maxCount = tokenCount + rule.getBurstCount(); if (acquireCount > maxCount) { return false; } @@ -150,7 +150,7 @@ public final class ParamFlowChecker { AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime)); if (lastAddTokenTime == null) { // Token never added, just replenish the tokens and consume {@code acquireCount} immediately. - tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount)); + tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); return true; } @@ -158,15 +158,15 @@ public final class ParamFlowChecker { long passTime = currentTime - lastAddTokenTime.get(); // A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed. if (passTime > rule.getDurationInSec() * 1000) { - AtomicInteger oldQps = tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount)); + AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); if (oldQps == null) { // Might not be accurate here. lastAddTokenTime.set(currentTime); return true; } else { - int restQps = oldQps.get(); - int toAddCount = (int)((passTime * tokenCount) / (rule.getDurationInSec() * 1000)); - int newQps = (restQps + toAddCount) > maxCount ? (maxCount - acquireCount) + long restQps = oldQps.get(); + long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000); + long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount) : (restQps + toAddCount - acquireCount); if (newQps < 0) { @@ -179,9 +179,9 @@ public final class ParamFlowChecker { Thread.yield(); } } else { - AtomicInteger oldQps = tokenCounters.get(value); + AtomicLong oldQps = tokenCounters.get(value); if (oldQps != null) { - int oldQpsValue = oldQps.get(); + long oldQpsValue = oldQps.get(); if (oldQpsValue - acquireCount >= 0) { if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) { return true; diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java index 4cb9d53f..daa436f3 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java @@ -51,7 +51,7 @@ public class ParameterMetric { * * @since 1.6.0 */ - private final Map> ruleTokenCounter = new HashMap<>(); + private final Map> ruleTokenCounter = new HashMap<>(); private final Map> threadCountMap = new HashMap<>(); /** @@ -61,7 +61,7 @@ public class ParameterMetric { * @return the associated token counter * @since 1.6.0 */ - public CacheMap getRuleTokenCounter(ParamFlowRule rule) { + public CacheMap getRuleTokenCounter(ParamFlowRule rule) { return ruleTokenCounter.get(rule); } @@ -98,7 +98,7 @@ public class ParameterMetric { synchronized (lock) { if (ruleTokenCounter.get(rule) == null) { long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY); - ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper(size)); + ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper(size)); } } } @@ -245,7 +245,7 @@ public class ParameterMetric { * * @return the token counter map */ - Map> getRuleTokenCounterMap() { + Map> getRuleTokenCounterMap() { return ruleTokenCounter; } diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java index 70c549f2..d7010b7a 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java @@ -160,7 +160,7 @@ public class ParamFlowCheckerTest { ParameterMetric metric = new ParameterMetric(); ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); - metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list)); assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list)); diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java index fca4fb1c..581c8522 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java @@ -38,9 +38,49 @@ import com.alibaba.csp.sentinel.util.TimeUtil; /** * @author jialiang.linjl + * @author Eric Zhao */ public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest { + @Test + public void testCheckQpsWithLongIntervalAndHighThreshold() { + // This test case is intended to avoid number overflow. + final String resourceName = "testCheckQpsWithLongIntervalAndHighThreshold"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + // Set a large threshold. + long threshold = 25000L; + + ParamFlowRule rule = new ParamFlowRule(resourceName) + .setCount(threshold) + .setParamIdx(paramIdx); + + String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, + new ConcurrentLinkedHashMapWrapper(4000)); + + // We mock the time directly to avoid unstable behaviour. + setCurrentMillis(System.currentTimeMillis()); + + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + // 24 hours passed. + // This can make `toAddCount` larger that Integer.MAX_VALUE. + sleep(1000 * 60 * 60 * 24); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + // 48 hours passed. + sleep(1000 * 60 * 60 * 48); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + } + @Test public void testParamFlowDefaultCheckSingleQps() { final String resourceName = "testParamFlowDefaultCheckSingleQps"; @@ -59,7 +99,7 @@ public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest { ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); metric.getRuleTokenCounterMap().put(rule, - new ConcurrentLinkedHashMapWrapper(4000)); + new ConcurrentLinkedHashMapWrapper(4000)); // We mock the time directly to avoid unstable behaviour. setCurrentMillis(System.currentTimeMillis()); @@ -99,7 +139,7 @@ public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest { ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); metric.getRuleTokenCounterMap().put(rule, - new ConcurrentLinkedHashMapWrapper(4000)); + new ConcurrentLinkedHashMapWrapper(4000)); // We mock the time directly to avoid unstable behaviour. setCurrentMillis(System.currentTimeMillis()); @@ -169,7 +209,7 @@ public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest { ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); metric.getRuleTokenCounterMap().put(rule, - new ConcurrentLinkedHashMapWrapper(4000)); + new ConcurrentLinkedHashMapWrapper(4000)); // We mock the time directly to avoid unstable behaviour. setCurrentMillis(System.currentTimeMillis()); @@ -222,7 +262,7 @@ public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest { ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); metric.getRuleTokenCounterMap().put(rule, - new ConcurrentLinkedHashMapWrapper(4000)); + new ConcurrentLinkedHashMapWrapper(4000)); int threadCount = 40; final CountDownLatch waitLatch = new CountDownLatch(threadCount); diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java index 0d983844..aadef74a 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java @@ -99,8 +99,8 @@ public class ParamFlowSlotTest { ParameterMetric metric = mock(ParameterMetric.class); - CacheMap map = new ConcurrentLinkedHashMapWrapper(4000); - CacheMap map2 = new ConcurrentLinkedHashMapWrapper(4000); + CacheMap map = new ConcurrentLinkedHashMapWrapper<>(4000); + CacheMap map2 = new ConcurrentLinkedHashMapWrapper<>(4000); when(metric.getRuleTimeCounter(rule)).thenReturn(map); when(metric.getRuleTokenCounter(rule)).thenReturn(map2); map.put(argToGo, new AtomicLong(TimeUtil.currentTimeMillis()));