Refactor RateLimiterController: improve accuracy and support maxQps > 1000

* Rename to ThrottlingController
* Improve accuracy: use nanoseconds when necessary and support maxQps threshold > 1000 (i.e. wait < 1ms)

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
This commit is contained in:
Eric Zhao 2022-11-14 23:20:10 +08:00 committed by LearningGp
parent 4e41c14514
commit e34d55273f
4 changed files with 193 additions and 120 deletions

View File

@ -19,7 +19,7 @@ import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; import com.alibaba.csp.sentinel.slots.block.flow.controller.ThrottlingController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController;
import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.csp.sentinel.util.StringUtil;
@ -136,7 +136,7 @@ public final class FlowRuleUtil {
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor); ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); return new ThrottlingController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);

View File

@ -1,93 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.controller;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.node.Node;
/**
* @author jialiang.linjl
*/
public class RateLimiterController implements TrafficShapingController {
private final int maxQueueingTimeMs;
private final double count;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}

View File

@ -0,0 +1,167 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.controller;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;
/**
* @author Eric Zhao
* @author jialiang.linjl
* @since 2.0
*/
public class ThrottlingController implements TrafficShapingController {
// Refactored from legacy RateLimitController of Sentinel 1.x.
private static final long MS_TO_NS_OFFSET = TimeUnit.MILLISECONDS.toNanos(1);
private final int maxQueueingTimeMs;
private final int statDurationMs;
private final double count;
private final boolean useNanoSeconds;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat) {
this(queueingTimeoutMs, maxCountPerStat, 1000);
}
public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs) {
AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive");
AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0");
AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0");
this.maxQueueingTimeMs = queueingTimeoutMs;
this.count = maxCountPerStat;
this.statDurationMs = statDurationMs;
// Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate)
if (maxCountPerStat > 0) {
this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat) != 0 || maxCountPerStat / statDurationMs > 1;
} else {
this.useNanoSeconds = false;
}
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat) {
final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET;
long currentTime = System.nanoTime();
// Calculate the interval between every two requests.
final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat);
// Expected pass time of this request.
long expectedTime = costTimeNs + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
final long curNanos = System.nanoTime();
// Calculate the time to wait.
long waitTime = costTimeNs + latestPassedTime.get() - curNanos;
if (waitTime > maxQueueingTimeNs) {
return false;
}
long oldTime = latestPassedTime.addAndGet(costTimeNs);
waitTime = oldTime - curNanos;
if (waitTime > maxQueueingTimeNs) {
latestPassedTime.addAndGet(-costTimeNs);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
sleepNanos(waitTime);
}
return true;
}
}
private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) {
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
}
long oldTime = latestPassedTime.addAndGet(costTime);
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
sleepMs(waitTime);
}
return true;
}
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise, the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
if (useNanoSeconds) {
return checkPassUsingNanoSeconds(acquireCount, this.count);
} else {
return checkPassUsingCachedMs(acquireCount, this.count);
}
}
private void sleepMs(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
}
}
private void sleepNanos(long ns) {
LockSupport.parkNanos(ns);
}
}

View File

@ -1,11 +1,11 @@
/* /*
* Copyright 1999-2018 Alibaba Group Holding Ltd. * Copyright 1999-2022 Alibaba Group Holding Ltd.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * https://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -15,26 +15,26 @@
*/ */
package com.alibaba.csp.sentinel.slots.block.flow.controller; package com.alibaba.csp.sentinel.slots.block.flow.controller;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.junit.Test; import org.junit.Test;
import com.alibaba.csp.sentinel.util.TimeUtil; import static org.junit.Assert.*;
import com.alibaba.csp.sentinel.node.Node; import static org.mockito.Mockito.mock;
/** /**
* @author Eric Zhao
* @author jialiang.linjl * @author jialiang.linjl
*/ */
public class RateLimiterControllerTest { public class ThrottlingControllerTest {
@Test @Test
public void testPaceController_normal() throws InterruptedException { public void testThrottlingControllerNormal() throws InterruptedException {
RateLimiterController paceController = new RateLimiterController(500, 10d); ThrottlingController paceController = new ThrottlingController(500, 10d);
Node node = mock(Node.class); Node node = mock(Node.class);
long start = TimeUtil.currentTimeMillis(); long start = TimeUtil.currentTimeMillis();
@ -46,12 +46,12 @@ public class RateLimiterControllerTest {
} }
@Test @Test
public void testPaceController_timeout() throws InterruptedException { public void testThrottlingControllerQueueTimeout() throws InterruptedException {
final RateLimiterController paceController = new RateLimiterController(500, 10d); final ThrottlingController paceController = new ThrottlingController(500, 10d);
final Node node = mock(Node.class); final Node node = mock(Node.class);
final AtomicInteger passcount = new AtomicInteger(); final AtomicInteger passCount = new AtomicInteger();
final AtomicInteger blockcount = new AtomicInteger(); final AtomicInteger blockCount = new AtomicInteger();
final CountDownLatch countDown = new CountDownLatch(1); final CountDownLatch countDown = new CountDownLatch(1);
final AtomicInteger done = new AtomicInteger(); final AtomicInteger done = new AtomicInteger();
@ -62,9 +62,9 @@ public class RateLimiterControllerTest {
boolean pass = paceController.canPass(node, 1); boolean pass = paceController.canPass(node, 1);
if (pass) { if (pass) {
passcount.incrementAndGet(); passCount.incrementAndGet();
} else { } else {
blockcount.incrementAndGet(); blockCount.incrementAndGet();
} }
done.incrementAndGet(); done.incrementAndGet();
@ -73,21 +73,20 @@ public class RateLimiterControllerTest {
} }
} }
}, "Thread " + i); }, "Thread-TestThrottlingControllerQueueTimeout-" + i);
thread.start(); thread.start();
} }
countDown.await(); countDown.await();
System.out.println("pass:" + passcount.get());
System.out.println("block" + blockcount.get());
System.out.println("done" + done.get());
assertTrue(blockcount.get() > 0);
System.out.println("pass: " + passCount.get());
System.out.println("block: " + blockCount.get());
System.out.println("done: " + done.get());
assertTrue(blockCount.get() > 0);
} }
@Test @Test
public void testPaceController_zeroattack() throws InterruptedException { public void testThrottlingControllerZeroThreshold() throws InterruptedException {
RateLimiterController paceController = new RateLimiterController(500, 0d); ThrottlingController paceController = new ThrottlingController(500, 0d);
Node node = mock(Node.class); Node node = mock(Node.class);
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {