diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java index f349e952..b607ef6b 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java @@ -19,7 +19,7 @@ import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; -import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; +import com.alibaba.csp.sentinel.slots.block.flow.controller.ThrottlingController; import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; import com.alibaba.csp.sentinel.util.StringUtil; @@ -136,7 +136,7 @@ public final class FlowRuleUtil { return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: - return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); + return new ThrottlingController(rule.getMaxQueueingTimeMs(), rule.getCount()); case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java deleted file mode 100755 index ca07b028..00000000 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.csp.sentinel.slots.block.flow.controller; - -import java.util.concurrent.atomic.AtomicLong; - -import com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController; - -import com.alibaba.csp.sentinel.util.TimeUtil; -import com.alibaba.csp.sentinel.node.Node; - -/** - * @author jialiang.linjl - */ -public class RateLimiterController implements TrafficShapingController { - - private final int maxQueueingTimeMs; - private final double count; - - private final AtomicLong latestPassedTime = new AtomicLong(-1); - - public RateLimiterController(int timeOut, double count) { - this.maxQueueingTimeMs = timeOut; - this.count = count; - } - - @Override - public boolean canPass(Node node, int acquireCount) { - return canPass(node, acquireCount, false); - } - - @Override - public boolean canPass(Node node, int acquireCount, boolean prioritized) { - // Pass when acquire count is less or equal than 0. - if (acquireCount <= 0) { - return true; - } - // Reject when count is less or equal than 0. - // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. - if (count <= 0) { - return false; - } - - long currentTime = TimeUtil.currentTimeMillis(); - // Calculate the interval between every two requests. - long costTime = Math.round(1.0 * (acquireCount) / count * 1000); - - // Expected pass time of this request. - long expectedTime = costTime + latestPassedTime.get(); - - if (expectedTime <= currentTime) { - // Contention may exist here, but it's okay. - latestPassedTime.set(currentTime); - return true; - } else { - // Calculate the time to wait. - long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); - if (waitTime > maxQueueingTimeMs) { - return false; - } else { - long oldTime = latestPassedTime.addAndGet(costTime); - try { - waitTime = oldTime - TimeUtil.currentTimeMillis(); - if (waitTime > maxQueueingTimeMs) { - latestPassedTime.addAndGet(-costTime); - return false; - } - // in race condition waitTime may <= 0 - if (waitTime > 0) { - Thread.sleep(waitTime); - } - return true; - } catch (InterruptedException e) { - } - } - } - return false; - } - -} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/ThrottlingController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/ThrottlingController.java new file mode 100644 index 00000000..64eaefc5 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/ThrottlingController.java @@ -0,0 +1,167 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.flow.controller; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import com.alibaba.csp.sentinel.node.Node; +import com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController; +import com.alibaba.csp.sentinel.util.AssertUtil; +import com.alibaba.csp.sentinel.util.TimeUtil; + +/** + * @author Eric Zhao + * @author jialiang.linjl + * @since 2.0 + */ +public class ThrottlingController implements TrafficShapingController { + + // Refactored from legacy RateLimitController of Sentinel 1.x. + + private static final long MS_TO_NS_OFFSET = TimeUnit.MILLISECONDS.toNanos(1); + + private final int maxQueueingTimeMs; + private final int statDurationMs; + + private final double count; + private final boolean useNanoSeconds; + + private final AtomicLong latestPassedTime = new AtomicLong(-1); + + public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat) { + this(queueingTimeoutMs, maxCountPerStat, 1000); + } + + public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs) { + AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive"); + AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0"); + AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0"); + this.maxQueueingTimeMs = queueingTimeoutMs; + this.count = maxCountPerStat; + this.statDurationMs = statDurationMs; + // Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate) + if (maxCountPerStat > 0) { + this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat) != 0 || maxCountPerStat / statDurationMs > 1; + } else { + this.useNanoSeconds = false; + } + } + + @Override + public boolean canPass(Node node, int acquireCount) { + return canPass(node, acquireCount, false); + } + + private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat) { + final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET; + long currentTime = System.nanoTime(); + // Calculate the interval between every two requests. + final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat); + + // Expected pass time of this request. + long expectedTime = costTimeNs + latestPassedTime.get(); + + if (expectedTime <= currentTime) { + // Contention may exist here, but it's okay. + latestPassedTime.set(currentTime); + return true; + } else { + final long curNanos = System.nanoTime(); + // Calculate the time to wait. + long waitTime = costTimeNs + latestPassedTime.get() - curNanos; + if (waitTime > maxQueueingTimeNs) { + return false; + } + + long oldTime = latestPassedTime.addAndGet(costTimeNs); + waitTime = oldTime - curNanos; + if (waitTime > maxQueueingTimeNs) { + latestPassedTime.addAndGet(-costTimeNs); + return false; + } + // in race condition waitTime may <= 0 + if (waitTime > 0) { + sleepNanos(waitTime); + } + return true; + } + } + + private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) { + long currentTime = TimeUtil.currentTimeMillis(); + // Calculate the interval between every two requests. + long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat); + + // Expected pass time of this request. + long expectedTime = costTime + latestPassedTime.get(); + + if (expectedTime <= currentTime) { + // Contention may exist here, but it's okay. + latestPassedTime.set(currentTime); + return true; + } else { + // Calculate the time to wait. + long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); + if (waitTime > maxQueueingTimeMs) { + return false; + } + + long oldTime = latestPassedTime.addAndGet(costTime); + waitTime = oldTime - TimeUtil.currentTimeMillis(); + if (waitTime > maxQueueingTimeMs) { + latestPassedTime.addAndGet(-costTime); + return false; + } + // in race condition waitTime may <= 0 + if (waitTime > 0) { + sleepMs(waitTime); + } + return true; + } + } + + @Override + public boolean canPass(Node node, int acquireCount, boolean prioritized) { + // Pass when acquire count is less or equal than 0. + if (acquireCount <= 0) { + return true; + } + // Reject when count is less or equal than 0. + // Otherwise, the costTime will be max of long and waitTime will overflow in some cases. + if (count <= 0) { + return false; + } + if (useNanoSeconds) { + return checkPassUsingNanoSeconds(acquireCount, this.count); + } else { + return checkPassUsingCachedMs(acquireCount, this.count); + } + } + + private void sleepMs(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + } + } + + private void sleepNanos(long ns) { + LockSupport.parkNanos(ns); + } + +} diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterControllerTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/controller/ThrottlingControllerTest.java old mode 100755 new mode 100644 similarity index 63% rename from sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterControllerTest.java rename to sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/controller/ThrottlingControllerTest.java index 0b83f5c0..6aa6a7ad --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterControllerTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/controller/ThrottlingControllerTest.java @@ -1,11 +1,11 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2022 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,26 +15,26 @@ */ package com.alibaba.csp.sentinel.slots.block.flow.controller; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import com.alibaba.csp.sentinel.node.Node; +import com.alibaba.csp.sentinel.util.TimeUtil; + import org.junit.Test; -import com.alibaba.csp.sentinel.util.TimeUtil; -import com.alibaba.csp.sentinel.node.Node; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; /** + * @author Eric Zhao * @author jialiang.linjl */ -public class RateLimiterControllerTest { +public class ThrottlingControllerTest { @Test - public void testPaceController_normal() throws InterruptedException { - RateLimiterController paceController = new RateLimiterController(500, 10d); + public void testThrottlingControllerNormal() throws InterruptedException { + ThrottlingController paceController = new ThrottlingController(500, 10d); Node node = mock(Node.class); long start = TimeUtil.currentTimeMillis(); @@ -46,12 +46,12 @@ public class RateLimiterControllerTest { } @Test - public void testPaceController_timeout() throws InterruptedException { - final RateLimiterController paceController = new RateLimiterController(500, 10d); + public void testThrottlingControllerQueueTimeout() throws InterruptedException { + final ThrottlingController paceController = new ThrottlingController(500, 10d); final Node node = mock(Node.class); - final AtomicInteger passcount = new AtomicInteger(); - final AtomicInteger blockcount = new AtomicInteger(); + final AtomicInteger passCount = new AtomicInteger(); + final AtomicInteger blockCount = new AtomicInteger(); final CountDownLatch countDown = new CountDownLatch(1); final AtomicInteger done = new AtomicInteger(); @@ -62,9 +62,9 @@ public class RateLimiterControllerTest { boolean pass = paceController.canPass(node, 1); if (pass) { - passcount.incrementAndGet(); + passCount.incrementAndGet(); } else { - blockcount.incrementAndGet(); + blockCount.incrementAndGet(); } done.incrementAndGet(); @@ -73,21 +73,20 @@ public class RateLimiterControllerTest { } } - }, "Thread " + i); + }, "Thread-TestThrottlingControllerQueueTimeout-" + i); thread.start(); } - countDown.await(); - System.out.println("pass:" + passcount.get()); - System.out.println("block" + blockcount.get()); - System.out.println("done" + done.get()); - assertTrue(blockcount.get() > 0); + System.out.println("pass: " + passCount.get()); + System.out.println("block: " + blockCount.get()); + System.out.println("done: " + done.get()); + assertTrue(blockCount.get() > 0); } @Test - public void testPaceController_zeroattack() throws InterruptedException { - RateLimiterController paceController = new RateLimiterController(500, 0d); + public void testThrottlingControllerZeroThreshold() throws InterruptedException { + ThrottlingController paceController = new ThrottlingController(500, 0d); Node node = mock(Node.class); for (int i = 0; i < 2; i++) {