fix flaky tests and fix passDefaultLocalCheck (#3367)

* fix test case SentinelDubboConsumerFilterTest#testDegradeSync

* When test is run slow, count bucket will count on next time span, causing failed test.

* dos2unix ParamFlowDefaultCheckerTest.java

* fix testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads by rule.setDurationInSec(2)

* set threshold as count in 2 seconds to prevent the failure of the unit test when the unit test runs longer than 1 second.

* fix quarkus test by set /txt sleep 300

* If /txt sleep 500 ms, in testSentinelJaxRsQuarkusAdapter, may cause 2 request intervals of more than 1 s, which cause rate limit policy is not effective.

* fix testDegradeAsync

* When test is run slow, count bucket will count on next time span, causing failed test.

* use testcontainers to fix testConsulDataSourceWhenInit

* Project embedded-consul has been deprecated in favour of org.testcontainers:consul
* use consul testcontainers to fix testConsulDataSourceWhenInit, which means docker is required to run tests.
```
Error:  com.alibaba.csp.sentinel.datasource.consul.ConsulDataSourceTest.testConsulDataSourceWhenInit -- Time elapsed: 34.47 s <<< ERROR!
  com.pszymczyk.consul.EmbeddedConsulException: Could not start Consul process in 30 seconds
  	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
  	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
  	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
  	at org.codehaus.groovy.reflection.CachedConstructor.invoke(CachedConstructor.java:72)
 ```

* introduce intermediate node to avoid ABA problem
This commit is contained in:
Robert Lu 2024-04-17 10:25:25 +08:00 committed by GitHub
parent befdc56885
commit cd02b1dc8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 659 additions and 556 deletions

View File

@ -38,7 +38,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,6 +15,7 @@
*/
package com.alibaba.csp.sentinel;
import com.alibaba.csp.sentinel.adapter.dubbo.AbstractTimeBasedTest;
import com.alibaba.csp.sentinel.adapter.dubbo.config.DubboAdapterGlobalConfig;
import com.alibaba.csp.sentinel.adapter.dubbo.fallback.DefaultDubboFallback;
import com.alibaba.csp.sentinel.config.SentinelConfig;
@ -37,7 +38,7 @@ import java.util.ArrayList;
* @author cdfive
* @author lianglin
*/
public class BaseTest {
public class BaseTest extends AbstractTimeBasedTest {
/**

View File

@ -0,0 +1,49 @@
/*
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.dubbo;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
public abstract class AbstractTimeBasedTest {
private long currentMillis = 0;
public MockedStatic<TimeUtil> mockTimeUtil() {
MockedStatic<TimeUtil> mocked = Mockito.mockStatic(TimeUtil.class);
mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis);
return mocked;
}
protected final void useActualTime(MockedStatic<TimeUtil> mocked) {
mocked.when(TimeUtil::currentTimeMillis).thenCallRealMethod();
}
protected final void setCurrentMillis(MockedStatic<TimeUtil> mocked, long cur) {
currentMillis = cur;
mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis);
}
protected final void sleep(MockedStatic<TimeUtil> mocked, long timeInMs) {
currentMillis += timeInMs;
mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis);
}
protected final void sleepSecond(MockedStatic<TimeUtil> mocked, long timeSec) {
sleep(mocked, timeSec * 1000);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -36,11 +36,15 @@ import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.apache.dubbo.rpc.*;
import org.apache.dubbo.rpc.support.RpcUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.*;
@ -53,6 +57,7 @@ import static org.mockito.Mockito.*;
* @author cdfive
* @author lianglin
*/
@RunWith(MockitoJUnitRunner.class)
public class SentinelDubboConsumerFilterTest extends BaseTest {
private final SentinelDubboConsumerFilter consumerFilter = new SentinelDubboConsumerFilter();
@ -94,62 +99,68 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
@Test
public void testDegradeAsync() throws InterruptedException {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
setCurrentMillis(mocked, 1740000000000L);
Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne();
Invoker invoker = DubboTestUtil.getDefaultMockInvoker();
Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne();
Invoker invoker = DubboTestUtil.getDefaultMockInvoker();
when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString());
initDegradeRule(DubboUtils.getInterfaceName(invoker));
when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString());
initDegradeRule(DubboUtils.getInterfaceName(invoker));
Result result = invokeDubboRpc(false, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
assertEquals("normal", result.getValue());
// inc the clusterNode's exception to trigger the fallback
for (int i = 0; i < 5; i++) {
invokeDubboRpc(true, invoker, invocation);
Result result = invokeDubboRpc(false, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
assertEquals("normal", result.getValue());
// inc the clusterNode's exception to trigger the fallback
for (int i = 0; i < 5; i++) {
invokeDubboRpc(true, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
}
Result result2 = invokeDubboRpc(false, invoker, invocation);
assertEquals("fallback", result2.getValue());
// sleeping 1000 ms to reset exception
sleep(mocked, 1000);
Result result3 = invokeDubboRpc(false, invoker, invocation);
assertEquals("normal", result3.getValue());
Context context = ContextUtil.getContext();
assertNull(context);
}
Result result2 = invokeDubboRpc(false, invoker, invocation);
assertEquals("fallback", result2.getValue());
// sleeping 1000 ms to reset exception
Thread.sleep(1000);
Result result3 = invokeDubboRpc(false, invoker, invocation);
assertEquals("normal", result3.getValue());
Context context = ContextUtil.getContext();
assertNull(context);
}
@Test
public void testDegradeSync() throws InterruptedException {
public void testDegradeSync() {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
setCurrentMillis(mocked, 1740000000000L);
Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne();
Invoker invoker = DubboTestUtil.getDefaultMockInvoker();
initDegradeRule(DubboUtils.getInterfaceName(invoker));
Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne();
Invoker invoker = DubboTestUtil.getDefaultMockInvoker();
initDegradeRule(DubboUtils.getInterfaceName(invoker));
Result result = invokeDubboRpc(false, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
assertEquals("normal", result.getValue());
// inc the clusterNode's exception to trigger the fallback
for (int i = 0; i < 5; i++) {
invokeDubboRpc(true, invoker, invocation);
Result result = invokeDubboRpc(false, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
assertEquals("normal", result.getValue());
// inc the clusterNode's exception to trigger the fallback
for (int i = 0; i < 5; i++) {
invokeDubboRpc(true, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
}
Result result2 = invokeDubboRpc(false, invoker, invocation);
assertEquals("fallback", result2.getValue());
// sleeping 1000 ms to reset exception
sleep(mocked, 1000);
Result result3 = invokeDubboRpc(false, invoker, invocation);
assertEquals("normal", result3.getValue());
Context context = ContextUtil.getContext();
assertNull(context);
}
Result result2 = invokeDubboRpc(false, invoker, invocation);
assertEquals("fallback", result2.getValue());
// sleeping 1000 ms to reset exception
Thread.sleep(1000);
Result result3 = invokeDubboRpc(false, invoker, invocation);
assertEquals("normal", result3.getValue());
Context context = ContextUtil.getContext();
assertNull(context);
}
@Test
@ -183,7 +194,6 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString());
final Result result = mock(Result.class);
when(result.hasException()).thenReturn(false);
when(invoker.invoke(invocation)).thenAnswer(invocationOnMock -> {
verifyInvocationStructureForAsyncCall(invoker, invocation);
return result;
@ -203,7 +213,6 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
final Result result = mock(Result.class);
when(result.hasException()).thenReturn(false);
when(result.getException()).thenReturn(new Exception());
when(invoker.invoke(invocation)).thenAnswer(invocationOnMock -> {
verifyInvocationStructure(invoker, invocation);
return result;

View File

@ -38,7 +38,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,6 +15,7 @@
*/
package com.alibaba.csp.sentinel;
import com.alibaba.csp.sentinel.adapter.dubbo3.AbstractTimeBasedTest;
import com.alibaba.csp.sentinel.adapter.dubbo3.config.DubboAdapterGlobalConfig;
import com.alibaba.csp.sentinel.adapter.dubbo3.fallback.DefaultDubboFallback;
import com.alibaba.csp.sentinel.config.SentinelConfig;
@ -37,7 +38,7 @@ import java.util.ArrayList;
* @author cdfive
* @author lianglin
*/
public class BaseTest {
public class BaseTest extends AbstractTimeBasedTest {
/**

View File

@ -0,0 +1,49 @@
/*
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.dubbo3;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
public abstract class AbstractTimeBasedTest {
private long currentMillis = 0;
public MockedStatic<TimeUtil> mockTimeUtil() {
MockedStatic<TimeUtil> mocked = Mockito.mockStatic(TimeUtil.class);
mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis);
return mocked;
}
protected final void useActualTime(MockedStatic<TimeUtil> mocked) {
mocked.when(TimeUtil::currentTimeMillis).thenCallRealMethod();
}
protected final void setCurrentMillis(MockedStatic<TimeUtil> mocked, long cur) {
currentMillis = cur;
mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis);
}
protected final void sleep(MockedStatic<TimeUtil> mocked, long timeInMs) {
currentMillis += timeInMs;
mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis);
}
protected final void sleepSecond(MockedStatic<TimeUtil> mocked, long timeSec) {
sleep(mocked, timeSec * 1000);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -32,12 +32,15 @@ import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.apache.dubbo.rpc.*;
import org.apache.dubbo.rpc.support.RpcUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.*;
@ -50,6 +53,7 @@ import static org.mockito.Mockito.*;
* @author cdfive
* @author lianglin
*/
@RunWith(MockitoJUnitRunner.class)
public class SentinelDubboConsumerFilterTest extends BaseTest {
private final SentinelDubboConsumerFilter consumerFilter = new SentinelDubboConsumerFilter();
@ -91,62 +95,67 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
@Test
public void testDegradeAsync() throws InterruptedException {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
setCurrentMillis(mocked, 1740000000000L);
Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne();
Invoker invoker = DubboTestUtil.getDefaultMockInvoker();
Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne();
Invoker invoker = DubboTestUtil.getDefaultMockInvoker();
when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString());
initDegradeRule(DubboUtils.getInterfaceName(invoker));
when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString());
initDegradeRule(DubboUtils.getInterfaceName(invoker));
Result result = invokeDubboRpc(false, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
assertEquals("normal", result.getValue());
// inc the clusterNode's exception to trigger the fallback
for (int i = 0; i < 5; i++) {
invokeDubboRpc(true, invoker, invocation);
Result result = invokeDubboRpc(false, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
assertEquals("normal", result.getValue());
// inc the clusterNode's exception to trigger the fallback
for (int i = 0; i < 5; i++) {
invokeDubboRpc(true, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
}
Result result2 = invokeDubboRpc(false, invoker, invocation);
assertEquals("fallback", result2.getValue());
// sleeping 1000 ms to reset exception
sleep(mocked, 1000);
Result result3 = invokeDubboRpc(false, invoker, invocation);
assertEquals("normal", result3.getValue());
Context context = ContextUtil.getContext();
assertNull(context);
}
Result result2 = invokeDubboRpc(false, invoker, invocation);
assertEquals("fallback", result2.getValue());
// sleeping 1000 ms to reset exception
Thread.sleep(1000);
Result result3 = invokeDubboRpc(false, invoker, invocation);
assertEquals("normal", result3.getValue());
Context context = ContextUtil.getContext();
assertNull(context);
}
@Test
public void testDegradeSync() throws InterruptedException {
public void testDegradeSync() {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
setCurrentMillis(mocked, 1750000000000L);
Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne();
Invoker invoker = DubboTestUtil.getDefaultMockInvoker();
initDegradeRule(DubboUtils.getInterfaceName(invoker));
Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne();
Invoker invoker = DubboTestUtil.getDefaultMockInvoker();
initDegradeRule(DubboUtils.getInterfaceName(invoker));
Result result = invokeDubboRpc(false, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
assertEquals("normal", result.getValue());
// inc the clusterNode's exception to trigger the fallback
for (int i = 0; i < 5; i++) {
invokeDubboRpc(true, invoker, invocation);
Result result = invokeDubboRpc(false, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
assertEquals("normal", result.getValue());
// inc the clusterNode's exception to trigger the fallback
for (int i = 0; i < 5; i++) {
invokeDubboRpc(true, invoker, invocation);
verifyInvocationStructureForCallFinish(invoker, invocation);
}
Result result2 = invokeDubboRpc(false, invoker, invocation);
assertEquals("fallback", result2.getValue());
// sleeping 1000 ms to reset exception
sleep(mocked, 1000);
Result result3 = invokeDubboRpc(false, invoker, invocation);
assertEquals("normal", result3.getValue());
Context context = ContextUtil.getContext();
assertNull(context);
}
Result result2 = invokeDubboRpc(false, invoker, invocation);
assertEquals("fallback", result2.getValue());
// sleeping 1000 ms to reset exception
Thread.sleep(1000);
Result result3 = invokeDubboRpc(false, invoker, invocation);
assertEquals("normal", result3.getValue());
Context context = ContextUtil.getContext();
assertNull(context);
}
@Test
@ -180,7 +189,6 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString());
final Result result = mock(Result.class);
when(result.hasException()).thenReturn(false);
when(invoker.invoke(invocation)).thenAnswer(invocationOnMock -> {
verifyInvocationStructureForAsyncCall(invoker, invocation);
return result;
@ -200,7 +208,6 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
final Result result = mock(Result.class);
when(result.hasException()).thenReturn(false);
when(result.getException()).thenReturn(new Exception());
when(invoker.invoke(invocation)).thenAnswer(invocationOnMock -> {
verifyInvocationStructure(invoker, invocation);
return result;
@ -263,7 +270,7 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
ClusterNode methodClusterNode = methodNode.getClusterNode();
ClusterNode interfaceClusterNode = interfaceNode.getClusterNode();
assertNotSame(methodClusterNode,
interfaceClusterNode);// Different resource->Different ProcessorSlot->Different ClusterNode
interfaceClusterNode);// Different resource->Different ProcessorSlot->Different ClusterNode
// As context origin is "", the StatisticNode should not be created in originCountMap of ClusterNode
Map<String, StatisticNode> methodOriginCountMap = methodClusterNode.getOriginCountMap();
@ -315,7 +322,7 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
ClusterNode methodClusterNode = methodNode.getClusterNode();
ClusterNode interfaceClusterNode = interfaceNode.getClusterNode();
assertNotSame(methodClusterNode,
interfaceClusterNode);// Different resource->Different ProcessorSlot->Different ClusterNode
interfaceClusterNode);// Different resource->Different ProcessorSlot->Different ClusterNode
// As context origin is "", the StatisticNode should not be created in originCountMap of ClusterNode
Map<String, StatisticNode> methodOriginCountMap = methodClusterNode.getOriginCountMap();
@ -360,8 +367,8 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
private void initDegradeRule(String resource) {
DegradeRule degradeRule = new DegradeRule(resource)
.setCount(0.5)
.setGrade(DEGRADE_GRADE_EXCEPTION_RATIO);
.setCount(0.5)
.setGrade(DEGRADE_GRADE_EXCEPTION_RATIO);
List<DegradeRule> degradeRules = new ArrayList<>();
degradeRules.add(degradeRule);
degradeRule.setTimeWindow(1);
@ -382,7 +389,7 @@ public class SentinelDubboConsumerFilterTest extends BaseTest {
result = exception ? new AppResponse(new Exception("error")) : new AppResponse("normal");
} else {
result = exception ? AsyncRpcResult.newDefaultAsyncResult(new Exception("error"), invocation)
: AsyncRpcResult.newDefaultAsyncResult("normal", invocation);
: AsyncRpcResult.newDefaultAsyncResult("normal", invocation);
}
when(invoker.invoke(invocation)).thenReturn(result);
return consumerFilter.invoke(invoker, invocation);

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,7 +37,7 @@ public class GreetingResource {
@Path("/txt")
@Produces(MediaType.TEXT_PLAIN)
public String hello() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(500);
TimeUnit.MILLISECONDS.sleep(300);
return "hello";
}

View File

@ -16,7 +16,6 @@
<java.source.version>1.8</java.source.version>
<java.target.version>1.8</java.target.version>
<consul.version>1.4.5</consul.version>
<consul.process.version>2.2.0</consul.process.version>
</properties>
<dependencies>
@ -36,9 +35,9 @@
<version>${consul.version}</version>
</dependency>
<dependency>
<groupId>com.pszymczyk.consul</groupId>
<artifactId>embedded-consul</artifactId>
<version>${consul.process.version}</version>
<groupId>org.testcontainers</groupId>
<artifactId>consul</artifactId>
<version>1.19.7</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,15 +21,10 @@ import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.Response;
import com.pszymczyk.consul.ConsulProcess;
import com.pszymczyk.consul.ConsulStarterBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.*;
import org.testcontainers.consul.ConsulContainer;
import java.util.ArrayList;
import java.util.List;
@ -40,11 +35,12 @@ import java.util.concurrent.TimeUnit;
* @author wavesZh
*/
public class ConsulDataSourceTest {
@ClassRule
public static ConsulContainer consulContainer = new ConsulContainer("hashicorp/consul:1.15");
private final String ruleKey = "sentinel.rules.flow.ruleKey";
private final int waitTimeoutInSecond = 1;
private ConsulProcess consul;
private ConsulClient client;
private ReadableDataSource<String, List<FlowRule>> consulDataSource;
@ -53,11 +49,8 @@ public class ConsulDataSourceTest {
@Before
public void init() {
this.consul = ConsulStarterBuilder.consulStarter()
.build()
.start();
int port = consul.getHttpPort();
String host = "127.0.0.1";
int port = consulContainer.getMappedPort(8500);
String host = consulContainer.getHost();
client = new ConsulClient(host, port);
Converter<String, List<FlowRule>> flowConfigParser = buildFlowConfigParser();
String flowRulesJson =
@ -76,9 +69,6 @@ public class ConsulDataSourceTest {
if (consulDataSource != null) {
consulDataSource.close();
}
if (consul != null) {
consul.close();
}
FlowRuleManager.loadRules(new ArrayList<>());
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,16 +15,6 @@
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.csp.sentinel.cluster.ClusterStateManager;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
@ -37,6 +27,13 @@ import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.util.TimeUtil;
import java.lang.reflect.Array;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* Rule checker for parameter flow control.
*
@ -46,7 +43,7 @@ import com.alibaba.csp.sentinel.util.TimeUtil;
public final class ParamFlowChecker {
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
Object... args) {
if (args == null) {
return true;
}
@ -79,7 +76,7 @@ public final class ParamFlowChecker {
Object value) {
try {
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
for (Object param : ((Collection) value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
@ -117,7 +114,7 @@ public final class ParamFlowChecker {
int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
long threshold = (long) rule.getCount();
return ++threadCount <= threshold;
}
@ -127,16 +124,16 @@ public final class ParamFlowChecker {
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
CacheMap<Object, AtomicReference<TokenUpdateStatus>> tokenCounters = metric == null ? null : metric.getRuleStampedTokenCounter(rule);
if (tokenCounters == null || timeCounters == null) {
DateTimeFormatter dtf = DateTimeFormatter.ISO_DATE_TIME;
if (tokenCounters == null) {
return true;
}
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
long tokenCount = (long) rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
@ -153,49 +150,44 @@ public final class ParamFlowChecker {
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
AtomicReference<TokenUpdateStatus> atomicLastStatus = tokenCounters.putIfAbsent(value, new AtomicReference<>(
new TokenUpdateStatus(currentTime, maxCount - acquireCount)
));
if (atomicLastStatus == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
// Calculate the time duration since last token was added.
long passTime = currentTime - lastAddTokenTime.get();
TokenUpdateStatus lastStatus = atomicLastStatus.get();
long passTime = currentTime - lastStatus.getLastAddTokenTime();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
long newQps;
if (passTime > rule.getDurationInSec() * 1000) {
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
long restQps = oldQps.get();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
long restQps = lastStatus.getRestQps();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
if (newQps < 0) {
return false;
}
TokenUpdateStatus newStatus = new TokenUpdateStatus(currentTime, newQps);
if (atomicLastStatus.compareAndSet(lastStatus, newStatus)) {
return true;
}
Thread.yield();
} else {
newQps = lastStatus.getRestQps() - acquireCount;
if (newQps >= 0) {
TokenUpdateStatus newStatus = new TokenUpdateStatus(lastStatus.getLastAddTokenTime(), newQps);
if (atomicLastStatus.compareAndSet(lastStatus, newStatus)) {
return true;
}
Thread.yield();
}
} else {
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
}
} else {
return false;
}
Thread.yield();
}
}
@ -211,7 +203,7 @@ public final class ParamFlowChecker {
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
long tokenCount = (long) rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
@ -261,7 +253,7 @@ public final class ParamFlowChecker {
@SuppressWarnings("unchecked")
private static Collection<Object> toCollection(Object value) {
if (value instanceof Collection) {
return (Collection<Object>)value;
return (Collection<Object>) value;
} else if (value.getClass().isArray()) {
List<Object> params = new ArrayList<Object>();
int length = Array.getLength(value);

View File

@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
@ -46,12 +47,14 @@ public class ParameterMetric {
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
/**
* Format: (rule, (value, tokenCounter))
*
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
private final Map<ParamFlowRule, CacheMap<Object, AtomicReference<TokenUpdateStatus>>> ruleTokenCounter = new HashMap<>();
private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();
/**
@ -59,12 +62,20 @@ public class ParameterMetric {
*
* @param rule valid parameter rule
* @return the associated token counter
* @since 1.6.0
* @since 1.8.8
*/
public CacheMap<Object, AtomicLong> getRuleTokenCounter(ParamFlowRule rule) {
CacheMap<Object, AtomicReference<TokenUpdateStatus>> getRuleStampedTokenCounter(ParamFlowRule rule) {
return ruleTokenCounter.get(rule);
}
public void clear() {
synchronized (lock) {
ruleTimeCounters.clear();
ruleTokenCounter.clear();
threadCountMap.clear();
}
}
/**
* Get the time record counter for given parameter rule.
*
@ -76,14 +87,6 @@ public class ParameterMetric {
return ruleTimeCounters.get(rule);
}
public void clear() {
synchronized (lock) {
threadCountMap.clear();
ruleTimeCounters.clear();
ruleTokenCounter.clear();
}
}
public void clearForRule(ParamFlowRule rule) {
synchronized (lock) {
ruleTimeCounters.remove(rule);
@ -106,7 +109,7 @@ public class ParameterMetric {
synchronized (lock) {
if (ruleTokenCounter.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<>(size));
}
}
}
@ -253,7 +256,7 @@ public class ParameterMetric {
*
* @return the token counter map
*/
Map<ParamFlowRule, CacheMap<Object, AtomicLong>> getRuleTokenCounterMap() {
Map<ParamFlowRule, CacheMap<Object, AtomicReference<TokenUpdateStatus>>> getRuleTokenCounterMap() {
return ruleTokenCounter;
}

View File

@ -0,0 +1,30 @@
package com.alibaba.csp.sentinel.slots.block.flow.param;
class TokenUpdateStatus {
private final long lastAddTokenTime;
private final long restQps;
public TokenUpdateStatus(long lastAddTokenTime, long restQps) {
this.lastAddTokenTime = lastAddTokenTime;
this.restQps = restQps;
}
public long getLastAddTokenTime() {
return lastAddTokenTime;
}
public long getRestQps() {
return restQps;
}
@Override
public String toString() {
return "TokenUpdateStatus{" +
"hash=" + System.identityHashCode(this) +
", lastAddTokenTime=" + lastAddTokenTime +
", requestCount=" + restQps +
'}';
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,19 +15,10 @@
*/
package com.alibaba.csp.sentinel.block.flow.param;
import org.junit.runner.RunWith;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import com.alibaba.csp.sentinel.util.TimeUtil;
/**
* Mock support for {@link TimeUtil}.
*
* @author jason
*/
@RunWith(MockitoJUnitRunner.class)
public abstract class AbstractTimeBasedTest {
private long currentMillis = 0;

View File

@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -158,7 +159,7 @@ public class ParamFlowCheckerTest {
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));
assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
@ -215,7 +216,7 @@ public class ParamFlowCheckerTest {
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));
assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, args));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, args));

View File

@ -1,346 +1,327 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.block.flow.param.AbstractTimeBasedTest;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.mockito.MockedStatic;
/**
* @author jialiang.linjl
* @author Eric Zhao
*/
public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {
@Test
public void testCheckQpsWithLongIntervalAndHighThreshold() {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
// This test case is intended to avoid number overflow.
final String resourceName = "testCheckQpsWithLongIntervalAndHighThreshold";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
// Set a large threshold.
long threshold = 25000L;
ParamFlowRule rule = new ParamFlowRule(resourceName)
.setCount(threshold)
.setParamIdx(paramIdx);
String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
// 24 hours passed.
// This can make `toAddCount` larger that Integer.MAX_VALUE.
sleep(mocked, 1000 * 60 * 60 * 24);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
// 48 hours passed.
sleep(mocked, 1000 * 60 * 60 * 48);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
}
@Test
public void testParamFlowDefaultCheckSingleQps() {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
final String resourceName = "testParamFlowDefaultCheckSingleQps";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 3000);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
}
@Test
public void testParamFlowDefaultCheckSingleQpsWithBurst() throws InterruptedException {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
final String resourceName = "testParamFlowDefaultCheckSingleQpsWithBurst";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
rule.setBurstCount(3);
String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 1002);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 1002);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 2000);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 1002);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
}
@Test
public void testParamFlowDefaultCheckQpsInDifferentDuration() throws InterruptedException {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
final String resourceName = "testParamFlowDefaultCheckQpsInDifferentDuration";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
rule.setDurationInSec(60);
String valueA = "helloWorld";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(mocked, 1);
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(mocked, 10);
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(mocked, 30);
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(mocked, 30);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
}
@Test
public void testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads() throws Exception {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
// In this test case we use the actual time.
useActualTime(mocked);
final String resourceName = "testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
final ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
final String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
int threadCount = 40;
final CountDownLatch waitLatch = new CountDownLatch(threadCount);
final AtomicInteger successCount = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount.incrementAndGet();
}
waitLatch.countDown();
}
});
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
waitLatch.await();
assertEquals(successCount.get(), threshold);
successCount.set(0);
System.out.println("testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads: sleep for 3 seconds");
TimeUnit.SECONDS.sleep(3);
successCount.set(0);
final CountDownLatch waitLatch1 = new CountDownLatch(threadCount);
final long currentTime = TimeUtil.currentTimeMillis();
final long endTime = currentTime + rule.getDurationInSec() * 1000 - 1;
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
long currentTime1 = currentTime;
while (currentTime1 <= endTime) {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount.incrementAndGet();
}
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20));
} catch (InterruptedException e) {
e.printStackTrace();
}
currentTime1 = TimeUtil.currentTimeMillis();
}
waitLatch1.countDown();
}
});
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
waitLatch1.await();
assertEquals(successCount.get(), threshold);
}
}
@Before
public void setUp() throws Exception {
ParameterMetricStorage.getMetricsMap().clear();
}
@After
public void tearDown() throws Exception {
ParameterMetricStorage.getMetricsMap().clear();
}
}
/*
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.block.flow.param.AbstractTimeBasedTest;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.mockito.MockedStatic;
/**
* @author jialiang.linjl
* @author Eric Zhao
*/
public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {
@Test
public void testCheckQpsWithLongIntervalAndHighThreshold() {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
// This test case is intended to avoid number overflow.
final String resourceName = "testCheckQpsWithLongIntervalAndHighThreshold";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
// Set a large threshold.
long threshold = 25000L;
ParamFlowRule rule = new ParamFlowRule(resourceName)
.setCount(threshold)
.setParamIdx(paramIdx);
String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
// 24 hours passed.
// This can make `toAddCount` larger that Integer.MAX_VALUE.
sleep(mocked, 1000 * 60 * 60 * 24);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
// 48 hours passed.
sleep(mocked, 1000 * 60 * 60 * 48);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
}
@Test
public void testParamFlowDefaultCheckSingleQps() {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
final String resourceName = "testParamFlowDefaultCheckSingleQps";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 3000);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
}
@Test
public void testParamFlowDefaultCheckSingleQpsWithBurst() throws InterruptedException {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
final String resourceName = "testParamFlowDefaultCheckSingleQpsWithBurst";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
rule.setBurstCount(3);
String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 1002);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 1002);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 2000);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(mocked, 1002);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
}
@Test
public void testParamFlowDefaultCheckQpsInDifferentDuration() throws InterruptedException {
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
final String resourceName = "testParamFlowDefaultCheckQpsInDifferentDuration";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
rule.setDurationInSec(60);
String valueA = "helloWorld";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(mocked, 1);
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(mocked, 10);
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(mocked, 30);
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(mocked, 30);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
}
@Test
public void testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads() throws Exception {
final String resourceName = "testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
final ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
rule.setDurationInSec(3);
final String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));
int threadCount = 40;
final CountDownLatch waitLatch = new CountDownLatch(threadCount);
final AtomicInteger successCount = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(() -> {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount.incrementAndGet();
}
waitLatch.countDown();
});
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
waitLatch.await();
assertEquals(threshold, successCount.get());
successCount.set(0);
System.out.println("testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads: sleep for 3 seconds");
TimeUnit.SECONDS.sleep(3);
successCount.set(0);
final CountDownLatch waitLatch1 = new CountDownLatch(threadCount);
final long currentTime = TimeUtil.currentTimeMillis();
final long endTime = currentTime + rule.getDurationInSec() * 1000 - 500;
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(() -> {
while (TimeUtil.currentTimeMillis() <= endTime) {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount.incrementAndGet();
}
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
waitLatch1.countDown();
});
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
waitLatch1.await();
assertEquals(threshold, successCount.get());
}
@Before
public void setUp() throws Exception {
ParameterMetricStorage.getMetricsMap().clear();
}
@After
public void tearDown() throws Exception {
ParameterMetricStorage.getMetricsMap().clear();
}
}

View File

@ -24,8 +24,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
@ -100,9 +100,9 @@ public class ParamFlowSlotTest {
ParameterMetric metric = mock(ParameterMetric.class);
CacheMap<Object, AtomicLong> map = new ConcurrentLinkedHashMapWrapper<>(4000);
CacheMap<Object, AtomicLong> map2 = new ConcurrentLinkedHashMapWrapper<>(4000);
CacheMap<Object, AtomicReference<TokenUpdateStatus>> map2 = new ConcurrentLinkedHashMapWrapper<>(4000);
when(metric.getRuleTimeCounter(rule)).thenReturn(map);
when(metric.getRuleTokenCounter(rule)).thenReturn(map2);
when(metric.getRuleStampedTokenCounter(rule)).thenReturn(map2);
map.put(argToGo, new AtomicLong(TimeUtil.currentTimeMillis()));
// Insert the mock metric to control pass or block.