diff --git a/sentinel-adapter/sentinel-apache-dubbo-adapter/pom.xml b/sentinel-adapter/sentinel-apache-dubbo-adapter/pom.xml index 356cae7c..97aa419b 100644 --- a/sentinel-adapter/sentinel-apache-dubbo-adapter/pom.xml +++ b/sentinel-adapter/sentinel-apache-dubbo-adapter/pom.xml @@ -38,7 +38,7 @@ org.mockito - mockito-core + mockito-inline test diff --git a/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/BaseTest.java b/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/BaseTest.java index db78c366..c7f843f1 100644 --- a/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/BaseTest.java +++ b/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/BaseTest.java @@ -1,5 +1,5 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2024 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ package com.alibaba.csp.sentinel; +import com.alibaba.csp.sentinel.adapter.dubbo.AbstractTimeBasedTest; import com.alibaba.csp.sentinel.adapter.dubbo.config.DubboAdapterGlobalConfig; import com.alibaba.csp.sentinel.adapter.dubbo.fallback.DefaultDubboFallback; import com.alibaba.csp.sentinel.config.SentinelConfig; @@ -37,7 +38,7 @@ import java.util.ArrayList; * @author cdfive * @author lianglin */ -public class BaseTest { +public class BaseTest extends AbstractTimeBasedTest { /** diff --git a/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo/AbstractTimeBasedTest.java b/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo/AbstractTimeBasedTest.java new file mode 100644 index 00000000..d23f8002 --- /dev/null +++ b/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo/AbstractTimeBasedTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 1999-2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.dubbo; + +import com.alibaba.csp.sentinel.util.TimeUtil; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +public abstract class AbstractTimeBasedTest { + + private long currentMillis = 0; + + public MockedStatic mockTimeUtil() { + MockedStatic mocked = Mockito.mockStatic(TimeUtil.class); + mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis); + return mocked; + } + + protected final void useActualTime(MockedStatic mocked) { + mocked.when(TimeUtil::currentTimeMillis).thenCallRealMethod(); + } + + protected final void setCurrentMillis(MockedStatic mocked, long cur) { + currentMillis = cur; + mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis); + } + + protected final void sleep(MockedStatic mocked, long timeInMs) { + currentMillis += timeInMs; + mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis); + } + + protected final void sleepSecond(MockedStatic mocked, long timeSec) { + sleep(mocked, timeSec * 1000); + } +} diff --git a/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo/SentinelDubboConsumerFilterTest.java b/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo/SentinelDubboConsumerFilterTest.java index a350a47d..4bebcbcc 100644 --- a/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo/SentinelDubboConsumerFilterTest.java +++ b/sentinel-adapter/sentinel-apache-dubbo-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo/SentinelDubboConsumerFilterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2024 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,11 +36,15 @@ import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.util.TimeUtil; import org.apache.dubbo.rpc.*; import org.apache.dubbo.rpc.support.RpcUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.MockedStatic; +import org.mockito.junit.MockitoJUnitRunner; import java.util.*; @@ -53,6 +57,7 @@ import static org.mockito.Mockito.*; * @author cdfive * @author lianglin */ +@RunWith(MockitoJUnitRunner.class) public class SentinelDubboConsumerFilterTest extends BaseTest { private final SentinelDubboConsumerFilter consumerFilter = new SentinelDubboConsumerFilter(); @@ -94,62 +99,68 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { @Test public void testDegradeAsync() throws InterruptedException { + try (MockedStatic mocked = super.mockTimeUtil()) { + setCurrentMillis(mocked, 1740000000000L); - Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne(); - Invoker invoker = DubboTestUtil.getDefaultMockInvoker(); + Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne(); + Invoker invoker = DubboTestUtil.getDefaultMockInvoker(); - when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString()); - initDegradeRule(DubboUtils.getInterfaceName(invoker)); + when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString()); + initDegradeRule(DubboUtils.getInterfaceName(invoker)); - Result result = invokeDubboRpc(false, invoker, invocation); - verifyInvocationStructureForCallFinish(invoker, invocation); - assertEquals("normal", result.getValue()); - - // inc the clusterNode's exception to trigger the fallback - for (int i = 0; i < 5; i++) { - invokeDubboRpc(true, invoker, invocation); + Result result = invokeDubboRpc(false, invoker, invocation); verifyInvocationStructureForCallFinish(invoker, invocation); + assertEquals("normal", result.getValue()); + + // inc the clusterNode's exception to trigger the fallback + for (int i = 0; i < 5; i++) { + invokeDubboRpc(true, invoker, invocation); + verifyInvocationStructureForCallFinish(invoker, invocation); + } + + Result result2 = invokeDubboRpc(false, invoker, invocation); + assertEquals("fallback", result2.getValue()); + + // sleeping 1000 ms to reset exception + sleep(mocked, 1000); + Result result3 = invokeDubboRpc(false, invoker, invocation); + assertEquals("normal", result3.getValue()); + + Context context = ContextUtil.getContext(); + assertNull(context); } - - Result result2 = invokeDubboRpc(false, invoker, invocation); - assertEquals("fallback", result2.getValue()); - - // sleeping 1000 ms to reset exception - Thread.sleep(1000); - Result result3 = invokeDubboRpc(false, invoker, invocation); - assertEquals("normal", result3.getValue()); - - Context context = ContextUtil.getContext(); - assertNull(context); } @Test - public void testDegradeSync() throws InterruptedException { + public void testDegradeSync() { + try (MockedStatic mocked = super.mockTimeUtil()) { + setCurrentMillis(mocked, 1740000000000L); - Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne(); - Invoker invoker = DubboTestUtil.getDefaultMockInvoker(); - initDegradeRule(DubboUtils.getInterfaceName(invoker)); + Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne(); + Invoker invoker = DubboTestUtil.getDefaultMockInvoker(); + initDegradeRule(DubboUtils.getInterfaceName(invoker)); - Result result = invokeDubboRpc(false, invoker, invocation); - verifyInvocationStructureForCallFinish(invoker, invocation); - assertEquals("normal", result.getValue()); - - // inc the clusterNode's exception to trigger the fallback - for (int i = 0; i < 5; i++) { - invokeDubboRpc(true, invoker, invocation); + Result result = invokeDubboRpc(false, invoker, invocation); verifyInvocationStructureForCallFinish(invoker, invocation); + assertEquals("normal", result.getValue()); + + // inc the clusterNode's exception to trigger the fallback + for (int i = 0; i < 5; i++) { + invokeDubboRpc(true, invoker, invocation); + verifyInvocationStructureForCallFinish(invoker, invocation); + } + + Result result2 = invokeDubboRpc(false, invoker, invocation); + assertEquals("fallback", result2.getValue()); + + // sleeping 1000 ms to reset exception + sleep(mocked, 1000); + Result result3 = invokeDubboRpc(false, invoker, invocation); + assertEquals("normal", result3.getValue()); + + Context context = ContextUtil.getContext(); + assertNull(context); } - - Result result2 = invokeDubboRpc(false, invoker, invocation); - assertEquals("fallback", result2.getValue()); - - // sleeping 1000 ms to reset exception - Thread.sleep(1000); - Result result3 = invokeDubboRpc(false, invoker, invocation); - assertEquals("normal", result3.getValue()); - - Context context = ContextUtil.getContext(); - assertNull(context); } @Test @@ -183,7 +194,6 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString()); final Result result = mock(Result.class); - when(result.hasException()).thenReturn(false); when(invoker.invoke(invocation)).thenAnswer(invocationOnMock -> { verifyInvocationStructureForAsyncCall(invoker, invocation); return result; @@ -203,7 +213,6 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { final Result result = mock(Result.class); when(result.hasException()).thenReturn(false); - when(result.getException()).thenReturn(new Exception()); when(invoker.invoke(invocation)).thenAnswer(invocationOnMock -> { verifyInvocationStructure(invoker, invocation); return result; diff --git a/sentinel-adapter/sentinel-apache-dubbo3-adapter/pom.xml b/sentinel-adapter/sentinel-apache-dubbo3-adapter/pom.xml index 6752376c..af797414 100644 --- a/sentinel-adapter/sentinel-apache-dubbo3-adapter/pom.xml +++ b/sentinel-adapter/sentinel-apache-dubbo3-adapter/pom.xml @@ -38,7 +38,7 @@ org.mockito - mockito-core + mockito-inline test diff --git a/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/BaseTest.java b/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/BaseTest.java index e4642cc9..4e19aef2 100644 --- a/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/BaseTest.java +++ b/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/BaseTest.java @@ -1,5 +1,5 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2024 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ package com.alibaba.csp.sentinel; +import com.alibaba.csp.sentinel.adapter.dubbo3.AbstractTimeBasedTest; import com.alibaba.csp.sentinel.adapter.dubbo3.config.DubboAdapterGlobalConfig; import com.alibaba.csp.sentinel.adapter.dubbo3.fallback.DefaultDubboFallback; import com.alibaba.csp.sentinel.config.SentinelConfig; @@ -37,7 +38,7 @@ import java.util.ArrayList; * @author cdfive * @author lianglin */ -public class BaseTest { +public class BaseTest extends AbstractTimeBasedTest { /** diff --git a/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo3/AbstractTimeBasedTest.java b/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo3/AbstractTimeBasedTest.java new file mode 100644 index 00000000..d2a0c421 --- /dev/null +++ b/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo3/AbstractTimeBasedTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 1999-2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.adapter.dubbo3; + +import com.alibaba.csp.sentinel.util.TimeUtil; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +public abstract class AbstractTimeBasedTest { + + private long currentMillis = 0; + + public MockedStatic mockTimeUtil() { + MockedStatic mocked = Mockito.mockStatic(TimeUtil.class); + mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis); + return mocked; + } + + protected final void useActualTime(MockedStatic mocked) { + mocked.when(TimeUtil::currentTimeMillis).thenCallRealMethod(); + } + + protected final void setCurrentMillis(MockedStatic mocked, long cur) { + currentMillis = cur; + mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis); + } + + protected final void sleep(MockedStatic mocked, long timeInMs) { + currentMillis += timeInMs; + mocked.when(TimeUtil::currentTimeMillis).thenReturn(currentMillis); + } + + protected final void sleepSecond(MockedStatic mocked, long timeSec) { + sleep(mocked, timeSec * 1000); + } +} diff --git a/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo3/SentinelDubboConsumerFilterTest.java b/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo3/SentinelDubboConsumerFilterTest.java index a91d67d4..b2cd294f 100644 --- a/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo3/SentinelDubboConsumerFilterTest.java +++ b/sentinel-adapter/sentinel-apache-dubbo3-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/dubbo3/SentinelDubboConsumerFilterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2024 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,12 +32,15 @@ import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; - +import com.alibaba.csp.sentinel.util.TimeUtil; import org.apache.dubbo.rpc.*; import org.apache.dubbo.rpc.support.RpcUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.MockedStatic; +import org.mockito.junit.MockitoJUnitRunner; import java.util.*; @@ -50,6 +53,7 @@ import static org.mockito.Mockito.*; * @author cdfive * @author lianglin */ +@RunWith(MockitoJUnitRunner.class) public class SentinelDubboConsumerFilterTest extends BaseTest { private final SentinelDubboConsumerFilter consumerFilter = new SentinelDubboConsumerFilter(); @@ -91,62 +95,67 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { @Test public void testDegradeAsync() throws InterruptedException { + try (MockedStatic mocked = super.mockTimeUtil()) { + setCurrentMillis(mocked, 1740000000000L); + Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne(); + Invoker invoker = DubboTestUtil.getDefaultMockInvoker(); - Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne(); - Invoker invoker = DubboTestUtil.getDefaultMockInvoker(); + when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString()); + initDegradeRule(DubboUtils.getInterfaceName(invoker)); - when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString()); - initDegradeRule(DubboUtils.getInterfaceName(invoker)); - - Result result = invokeDubboRpc(false, invoker, invocation); - verifyInvocationStructureForCallFinish(invoker, invocation); - assertEquals("normal", result.getValue()); - - // inc the clusterNode's exception to trigger the fallback - for (int i = 0; i < 5; i++) { - invokeDubboRpc(true, invoker, invocation); + Result result = invokeDubboRpc(false, invoker, invocation); verifyInvocationStructureForCallFinish(invoker, invocation); + assertEquals("normal", result.getValue()); + + // inc the clusterNode's exception to trigger the fallback + for (int i = 0; i < 5; i++) { + invokeDubboRpc(true, invoker, invocation); + verifyInvocationStructureForCallFinish(invoker, invocation); + } + + Result result2 = invokeDubboRpc(false, invoker, invocation); + assertEquals("fallback", result2.getValue()); + + // sleeping 1000 ms to reset exception + sleep(mocked, 1000); + Result result3 = invokeDubboRpc(false, invoker, invocation); + assertEquals("normal", result3.getValue()); + + Context context = ContextUtil.getContext(); + assertNull(context); } - - Result result2 = invokeDubboRpc(false, invoker, invocation); - assertEquals("fallback", result2.getValue()); - - // sleeping 1000 ms to reset exception - Thread.sleep(1000); - Result result3 = invokeDubboRpc(false, invoker, invocation); - assertEquals("normal", result3.getValue()); - - Context context = ContextUtil.getContext(); - assertNull(context); } @Test - public void testDegradeSync() throws InterruptedException { + public void testDegradeSync() { + try (MockedStatic mocked = super.mockTimeUtil()) { + setCurrentMillis(mocked, 1750000000000L); - Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne(); - Invoker invoker = DubboTestUtil.getDefaultMockInvoker(); - initDegradeRule(DubboUtils.getInterfaceName(invoker)); + Invocation invocation = DubboTestUtil.getDefaultMockInvocationOne(); + Invoker invoker = DubboTestUtil.getDefaultMockInvoker(); + initDegradeRule(DubboUtils.getInterfaceName(invoker)); - Result result = invokeDubboRpc(false, invoker, invocation); - verifyInvocationStructureForCallFinish(invoker, invocation); - assertEquals("normal", result.getValue()); - - // inc the clusterNode's exception to trigger the fallback - for (int i = 0; i < 5; i++) { - invokeDubboRpc(true, invoker, invocation); + Result result = invokeDubboRpc(false, invoker, invocation); verifyInvocationStructureForCallFinish(invoker, invocation); + assertEquals("normal", result.getValue()); + + // inc the clusterNode's exception to trigger the fallback + for (int i = 0; i < 5; i++) { + invokeDubboRpc(true, invoker, invocation); + verifyInvocationStructureForCallFinish(invoker, invocation); + } + + Result result2 = invokeDubboRpc(false, invoker, invocation); + assertEquals("fallback", result2.getValue()); + + // sleeping 1000 ms to reset exception + sleep(mocked, 1000); + Result result3 = invokeDubboRpc(false, invoker, invocation); + assertEquals("normal", result3.getValue()); + + Context context = ContextUtil.getContext(); + assertNull(context); } - - Result result2 = invokeDubboRpc(false, invoker, invocation); - assertEquals("fallback", result2.getValue()); - - // sleeping 1000 ms to reset exception - Thread.sleep(1000); - Result result3 = invokeDubboRpc(false, invoker, invocation); - assertEquals("normal", result3.getValue()); - - Context context = ContextUtil.getContext(); - assertNull(context); } @Test @@ -180,7 +189,6 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { when(invocation.getAttachment(ASYNC_KEY)).thenReturn(Boolean.TRUE.toString()); final Result result = mock(Result.class); - when(result.hasException()).thenReturn(false); when(invoker.invoke(invocation)).thenAnswer(invocationOnMock -> { verifyInvocationStructureForAsyncCall(invoker, invocation); return result; @@ -200,7 +208,6 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { final Result result = mock(Result.class); when(result.hasException()).thenReturn(false); - when(result.getException()).thenReturn(new Exception()); when(invoker.invoke(invocation)).thenAnswer(invocationOnMock -> { verifyInvocationStructure(invoker, invocation); return result; @@ -263,7 +270,7 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { ClusterNode methodClusterNode = methodNode.getClusterNode(); ClusterNode interfaceClusterNode = interfaceNode.getClusterNode(); assertNotSame(methodClusterNode, - interfaceClusterNode);// Different resource->Different ProcessorSlot->Different ClusterNode + interfaceClusterNode);// Different resource->Different ProcessorSlot->Different ClusterNode // As context origin is "", the StatisticNode should not be created in originCountMap of ClusterNode Map methodOriginCountMap = methodClusterNode.getOriginCountMap(); @@ -315,7 +322,7 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { ClusterNode methodClusterNode = methodNode.getClusterNode(); ClusterNode interfaceClusterNode = interfaceNode.getClusterNode(); assertNotSame(methodClusterNode, - interfaceClusterNode);// Different resource->Different ProcessorSlot->Different ClusterNode + interfaceClusterNode);// Different resource->Different ProcessorSlot->Different ClusterNode // As context origin is "", the StatisticNode should not be created in originCountMap of ClusterNode Map methodOriginCountMap = methodClusterNode.getOriginCountMap(); @@ -360,8 +367,8 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { private void initDegradeRule(String resource) { DegradeRule degradeRule = new DegradeRule(resource) - .setCount(0.5) - .setGrade(DEGRADE_GRADE_EXCEPTION_RATIO); + .setCount(0.5) + .setGrade(DEGRADE_GRADE_EXCEPTION_RATIO); List degradeRules = new ArrayList<>(); degradeRules.add(degradeRule); degradeRule.setTimeWindow(1); @@ -382,7 +389,7 @@ public class SentinelDubboConsumerFilterTest extends BaseTest { result = exception ? new AppResponse(new Exception("error")) : new AppResponse("normal"); } else { result = exception ? AsyncRpcResult.newDefaultAsyncResult(new Exception("error"), invocation) - : AsyncRpcResult.newDefaultAsyncResult("normal", invocation); + : AsyncRpcResult.newDefaultAsyncResult("normal", invocation); } when(invoker.invoke(invocation)).thenReturn(result); return consumerFilter.invoke(invoker, invocation); diff --git a/sentinel-demo/sentinel-demo-quarkus/src/main/java/com/alibaba/csp/sentinel/demo/quarkus/GreetingResource.java b/sentinel-demo/sentinel-demo-quarkus/src/main/java/com/alibaba/csp/sentinel/demo/quarkus/GreetingResource.java index b716e530..f9deb561 100644 --- a/sentinel-demo/sentinel-demo-quarkus/src/main/java/com/alibaba/csp/sentinel/demo/quarkus/GreetingResource.java +++ b/sentinel-demo/sentinel-demo-quarkus/src/main/java/com/alibaba/csp/sentinel/demo/quarkus/GreetingResource.java @@ -1,5 +1,5 @@ /* - * Copyright 1999-2020 Alibaba Group Holding Ltd. + * Copyright 1999-2024 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ public class GreetingResource { @Path("/txt") @Produces(MediaType.TEXT_PLAIN) public String hello() throws InterruptedException { - TimeUnit.MILLISECONDS.sleep(500); + TimeUnit.MILLISECONDS.sleep(300); return "hello"; } diff --git a/sentinel-extension/sentinel-datasource-consul/pom.xml b/sentinel-extension/sentinel-datasource-consul/pom.xml index 20e4fac1..276842a2 100644 --- a/sentinel-extension/sentinel-datasource-consul/pom.xml +++ b/sentinel-extension/sentinel-datasource-consul/pom.xml @@ -16,7 +16,6 @@ 1.8 1.8 1.4.5 - 2.2.0 @@ -36,9 +35,9 @@ ${consul.version} - com.pszymczyk.consul - embedded-consul - ${consul.process.version} + org.testcontainers + consul + 1.19.7 test diff --git a/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java b/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java index f5b531c6..416b518d 100644 --- a/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java +++ b/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java @@ -1,5 +1,5 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2024 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,10 @@ import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; - import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.Response; -import com.pszymczyk.consul.ConsulProcess; -import com.pszymczyk.consul.ConsulStarterBuilder; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; +import org.testcontainers.consul.ConsulContainer; import java.util.ArrayList; import java.util.List; @@ -40,11 +35,12 @@ import java.util.concurrent.TimeUnit; * @author wavesZh */ public class ConsulDataSourceTest { + @ClassRule + public static ConsulContainer consulContainer = new ConsulContainer("hashicorp/consul:1.15"); private final String ruleKey = "sentinel.rules.flow.ruleKey"; private final int waitTimeoutInSecond = 1; - private ConsulProcess consul; private ConsulClient client; private ReadableDataSource> consulDataSource; @@ -53,11 +49,8 @@ public class ConsulDataSourceTest { @Before public void init() { - this.consul = ConsulStarterBuilder.consulStarter() - .build() - .start(); - int port = consul.getHttpPort(); - String host = "127.0.0.1"; + int port = consulContainer.getMappedPort(8500); + String host = consulContainer.getHost(); client = new ConsulClient(host, port); Converter> flowConfigParser = buildFlowConfigParser(); String flowRulesJson = @@ -76,9 +69,6 @@ public class ConsulDataSourceTest { if (consulDataSource != null) { consulDataSource.close(); } - if (consul != null) { - consul.close(); - } FlowRuleManager.loadRules(new ArrayList<>()); } diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java index bd5ed8a0..de7089b0 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java @@ -1,5 +1,5 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2024 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,16 +15,6 @@ */ package com.alibaba.csp.sentinel.slots.block.flow.param; -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - import com.alibaba.csp.sentinel.cluster.ClusterStateManager; import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenResultStatus; @@ -37,6 +27,13 @@ import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; import com.alibaba.csp.sentinel.util.TimeUtil; +import java.lang.reflect.Array; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + /** * Rule checker for parameter flow control. * @@ -46,7 +43,7 @@ import com.alibaba.csp.sentinel.util.TimeUtil; public final class ParamFlowChecker { public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count, - Object... args) { + Object... args) { if (args == null) { return true; } @@ -79,7 +76,7 @@ public final class ParamFlowChecker { Object value) { try { if (Collection.class.isAssignableFrom(value.getClass())) { - for (Object param : ((Collection)value)) { + for (Object param : ((Collection) value)) { if (!passSingleValueCheck(resourceWrapper, rule, count, param)) { return false; } @@ -117,7 +114,7 @@ public final class ParamFlowChecker { int itemThreshold = rule.getParsedHotItems().get(value); return ++threadCount <= itemThreshold; } - long threshold = (long)rule.getCount(); + long threshold = (long) rule.getCount(); return ++threadCount <= threshold; } @@ -127,16 +124,16 @@ public final class ParamFlowChecker { static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) { ParameterMetric metric = getParameterMetric(resourceWrapper); - CacheMap tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule); - CacheMap timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule); + CacheMap> tokenCounters = metric == null ? null : metric.getRuleStampedTokenCounter(rule); - if (tokenCounters == null || timeCounters == null) { + DateTimeFormatter dtf = DateTimeFormatter.ISO_DATE_TIME; + if (tokenCounters == null) { return true; } // Calculate max token count (threshold) Set exclusionItems = rule.getParsedHotItems().keySet(); - long tokenCount = (long)rule.getCount(); + long tokenCount = (long) rule.getCount(); if (exclusionItems.contains(value)) { tokenCount = rule.getParsedHotItems().get(value); } @@ -153,49 +150,44 @@ public final class ParamFlowChecker { while (true) { long currentTime = TimeUtil.currentTimeMillis(); - AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime)); - if (lastAddTokenTime == null) { + AtomicReference atomicLastStatus = tokenCounters.putIfAbsent(value, new AtomicReference<>( + new TokenUpdateStatus(currentTime, maxCount - acquireCount) + )); + if (atomicLastStatus == null) { // Token never added, just replenish the tokens and consume {@code acquireCount} immediately. - tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); return true; } // Calculate the time duration since last token was added. - long passTime = currentTime - lastAddTokenTime.get(); + TokenUpdateStatus lastStatus = atomicLastStatus.get(); + long passTime = currentTime - lastStatus.getLastAddTokenTime(); // A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed. + long newQps; if (passTime > rule.getDurationInSec() * 1000) { - AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); - if (oldQps == null) { - // Might not be accurate here. - lastAddTokenTime.set(currentTime); - return true; - } else { - long restQps = oldQps.get(); - long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000); - long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount) + long restQps = lastStatus.getRestQps(); + long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000); + newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount) : (restQps + toAddCount - acquireCount); - if (newQps < 0) { - return false; - } - if (oldQps.compareAndSet(restQps, newQps)) { - lastAddTokenTime.set(currentTime); + if (newQps < 0) { + return false; + } + TokenUpdateStatus newStatus = new TokenUpdateStatus(currentTime, newQps); + if (atomicLastStatus.compareAndSet(lastStatus, newStatus)) { + return true; + } + Thread.yield(); + } else { + newQps = lastStatus.getRestQps() - acquireCount; + if (newQps >= 0) { + TokenUpdateStatus newStatus = new TokenUpdateStatus(lastStatus.getLastAddTokenTime(), newQps); + if (atomicLastStatus.compareAndSet(lastStatus, newStatus)) { return true; } - Thread.yield(); - } - } else { - AtomicLong oldQps = tokenCounters.get(value); - if (oldQps != null) { - long oldQpsValue = oldQps.get(); - if (oldQpsValue - acquireCount >= 0) { - if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) { - return true; - } - } else { - return false; - } + } else { + return false; } + Thread.yield(); } } @@ -211,7 +203,7 @@ public final class ParamFlowChecker { // Calculate max token count (threshold) Set exclusionItems = rule.getParsedHotItems().keySet(); - long tokenCount = (long)rule.getCount(); + long tokenCount = (long) rule.getCount(); if (exclusionItems.contains(value)) { tokenCount = rule.getParsedHotItems().get(value); } @@ -261,7 +253,7 @@ public final class ParamFlowChecker { @SuppressWarnings("unchecked") private static Collection toCollection(Object value) { if (value instanceof Collection) { - return (Collection)value; + return (Collection) value; } else if (value.getClass().isArray()) { List params = new ArrayList(); int length = Array.getLength(value); diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java index ee29728a..a5ab902c 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; @@ -46,12 +47,14 @@ public class ParameterMetric { * @since 1.6.0 */ private final Map> ruleTimeCounters = new HashMap<>(); + /** * Format: (rule, (value, tokenCounter)) * * @since 1.6.0 */ - private final Map> ruleTokenCounter = new HashMap<>(); + private final Map>> ruleTokenCounter = new HashMap<>(); + private final Map> threadCountMap = new HashMap<>(); /** @@ -59,12 +62,20 @@ public class ParameterMetric { * * @param rule valid parameter rule * @return the associated token counter - * @since 1.6.0 + * @since 1.8.8 */ - public CacheMap getRuleTokenCounter(ParamFlowRule rule) { + CacheMap> getRuleStampedTokenCounter(ParamFlowRule rule) { return ruleTokenCounter.get(rule); } + public void clear() { + synchronized (lock) { + ruleTimeCounters.clear(); + ruleTokenCounter.clear(); + threadCountMap.clear(); + } + } + /** * Get the time record counter for given parameter rule. * @@ -76,14 +87,6 @@ public class ParameterMetric { return ruleTimeCounters.get(rule); } - public void clear() { - synchronized (lock) { - threadCountMap.clear(); - ruleTimeCounters.clear(); - ruleTokenCounter.clear(); - } - } - public void clearForRule(ParamFlowRule rule) { synchronized (lock) { ruleTimeCounters.remove(rule); @@ -106,7 +109,7 @@ public class ParameterMetric { synchronized (lock) { if (ruleTokenCounter.get(rule) == null) { long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY); - ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper(size)); + ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<>(size)); } } } @@ -253,7 +256,7 @@ public class ParameterMetric { * * @return the token counter map */ - Map> getRuleTokenCounterMap() { + Map>> getRuleTokenCounterMap() { return ruleTokenCounter; } diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/TokenUpdateStatus.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/TokenUpdateStatus.java new file mode 100644 index 00000000..a9864e5e --- /dev/null +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/TokenUpdateStatus.java @@ -0,0 +1,30 @@ +package com.alibaba.csp.sentinel.slots.block.flow.param; + +class TokenUpdateStatus { + + private final long lastAddTokenTime; + + private final long restQps; + + public TokenUpdateStatus(long lastAddTokenTime, long restQps) { + this.lastAddTokenTime = lastAddTokenTime; + this.restQps = restQps; + } + + public long getLastAddTokenTime() { + return lastAddTokenTime; + } + + public long getRestQps() { + return restQps; + } + + @Override + public String toString() { + return "TokenUpdateStatus{" + + "hash=" + System.identityHashCode(this) + + ", lastAddTokenTime=" + lastAddTokenTime + + ", requestCount=" + restQps + + '}'; + } +} diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/block/flow/param/AbstractTimeBasedTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/block/flow/param/AbstractTimeBasedTest.java index 6a6358a9..2e16b3cc 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/block/flow/param/AbstractTimeBasedTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/block/flow/param/AbstractTimeBasedTest.java @@ -1,5 +1,5 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2024 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,19 +15,10 @@ */ package com.alibaba.csp.sentinel.block.flow.param; -import org.junit.runner.RunWith; +import com.alibaba.csp.sentinel.util.TimeUtil; import org.mockito.MockedStatic; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; -import com.alibaba.csp.sentinel.util.TimeUtil; - -/** - * Mock support for {@link TimeUtil}. - * - * @author jason - */ -@RunWith(MockitoJUnitRunner.class) public abstract class AbstractTimeBasedTest { private long currentMillis = 0; diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java index bc545c6f..a3f8f260 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -158,7 +159,7 @@ public class ParamFlowCheckerTest { ParameterMetric metric = new ParameterMetric(); ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); - metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000)); assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list)); assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list)); @@ -215,7 +216,7 @@ public class ParamFlowCheckerTest { ParameterMetric metric = new ParameterMetric(); ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); - metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000)); assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, args)); assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, args)); diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java index 92fd2d3b..d809e3fd 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java @@ -1,346 +1,327 @@ -/* - * Copyright 1999-2019 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.csp.sentinel.slots.block.flow.param; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.alibaba.csp.sentinel.EntryType; -import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; -import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; -import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper; -import com.alibaba.csp.sentinel.block.flow.param.AbstractTimeBasedTest; -import com.alibaba.csp.sentinel.util.TimeUtil; -import org.mockito.MockedStatic; - -/** - * @author jialiang.linjl - * @author Eric Zhao - */ -public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest { - - @Test - public void testCheckQpsWithLongIntervalAndHighThreshold() { - try (MockedStatic mocked = super.mockTimeUtil()) { - // This test case is intended to avoid number overflow. - final String resourceName = "testCheckQpsWithLongIntervalAndHighThreshold"; - final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); - int paramIdx = 0; - - // Set a large threshold. - long threshold = 25000L; - - ParamFlowRule rule = new ParamFlowRule(resourceName) - .setCount(threshold) - .setParamIdx(paramIdx); - - String valueA = "valueA"; - ParameterMetric metric = new ParameterMetric(); - ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); - metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); - metric.getRuleTokenCounterMap().put(rule, - new ConcurrentLinkedHashMapWrapper(4000)); - - // We mock the time directly to avoid unstable behaviour. - setCurrentMillis(mocked, System.currentTimeMillis()); - - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - // 24 hours passed. - // This can make `toAddCount` larger that Integer.MAX_VALUE. - sleep(mocked, 1000 * 60 * 60 * 24); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - // 48 hours passed. - sleep(mocked, 1000 * 60 * 60 * 48); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - } - } - - @Test - public void testParamFlowDefaultCheckSingleQps() { - try (MockedStatic mocked = super.mockTimeUtil()) { - final String resourceName = "testParamFlowDefaultCheckSingleQps"; - final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); - int paramIdx = 0; - - long threshold = 5L; - - ParamFlowRule rule = new ParamFlowRule(); - rule.setResource(resourceName); - rule.setCount(threshold); - rule.setParamIdx(paramIdx); - - String valueA = "valueA"; - ParameterMetric metric = new ParameterMetric(); - ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); - metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); - metric.getRuleTokenCounterMap().put(rule, - new ConcurrentLinkedHashMapWrapper(4000)); - - // We mock the time directly to avoid unstable behaviour. - setCurrentMillis(mocked, System.currentTimeMillis()); - - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - sleep(mocked, 3000); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - } - } - - @Test - public void testParamFlowDefaultCheckSingleQpsWithBurst() throws InterruptedException { - try (MockedStatic mocked = super.mockTimeUtil()) { - final String resourceName = "testParamFlowDefaultCheckSingleQpsWithBurst"; - final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); - int paramIdx = 0; - - long threshold = 5L; - - ParamFlowRule rule = new ParamFlowRule(); - rule.setResource(resourceName); - rule.setCount(threshold); - rule.setParamIdx(paramIdx); - rule.setBurstCount(3); - - String valueA = "valueA"; - ParameterMetric metric = new ParameterMetric(); - ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); - metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); - metric.getRuleTokenCounterMap().put(rule, - new ConcurrentLinkedHashMapWrapper(4000)); - - // We mock the time directly to avoid unstable behaviour. - setCurrentMillis(mocked, System.currentTimeMillis()); - - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - sleep(mocked, 1002); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - sleep(mocked, 1002); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - sleep(mocked, 2000); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - sleep(mocked, 1002); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - } - } - - @Test - public void testParamFlowDefaultCheckQpsInDifferentDuration() throws InterruptedException { - try (MockedStatic mocked = super.mockTimeUtil()) { - final String resourceName = "testParamFlowDefaultCheckQpsInDifferentDuration"; - final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); - int paramIdx = 0; - - long threshold = 5L; - - ParamFlowRule rule = new ParamFlowRule(); - rule.setResource(resourceName); - rule.setCount(threshold); - rule.setParamIdx(paramIdx); - rule.setDurationInSec(60); - - String valueA = "helloWorld"; - ParameterMetric metric = new ParameterMetric(); - ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); - metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); - metric.getRuleTokenCounterMap().put(rule, - new ConcurrentLinkedHashMapWrapper(4000)); - - // We mock the time directly to avoid unstable behaviour. - setCurrentMillis(mocked, System.currentTimeMillis()); - - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - sleepSecond(mocked, 1); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - sleepSecond(mocked, 10); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - sleepSecond(mocked, 30); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - sleepSecond(mocked, 30); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - } - } - - @Test - public void testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads() throws Exception { - try (MockedStatic mocked = super.mockTimeUtil()) { - // In this test case we use the actual time. - useActualTime(mocked); - - final String resourceName = "testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads"; - final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); - int paramIdx = 0; - - long threshold = 5L; - - final ParamFlowRule rule = new ParamFlowRule(); - rule.setResource(resourceName); - rule.setCount(threshold); - rule.setParamIdx(paramIdx); - - final String valueA = "valueA"; - ParameterMetric metric = new ParameterMetric(); - ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); - metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); - metric.getRuleTokenCounterMap().put(rule, - new ConcurrentLinkedHashMapWrapper(4000)); - int threadCount = 40; - - final CountDownLatch waitLatch = new CountDownLatch(threadCount); - final AtomicInteger successCount = new AtomicInteger(); - for (int i = 0; i < threadCount; i++) { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { - successCount.incrementAndGet(); - } - waitLatch.countDown(); - } - - }); - t.setName("sentinel-simulate-traffic-task-" + i); - t.start(); - } - waitLatch.await(); - - assertEquals(successCount.get(), threshold); - successCount.set(0); - - System.out.println("testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads: sleep for 3 seconds"); - TimeUnit.SECONDS.sleep(3); - - successCount.set(0); - final CountDownLatch waitLatch1 = new CountDownLatch(threadCount); - final long currentTime = TimeUtil.currentTimeMillis(); - final long endTime = currentTime + rule.getDurationInSec() * 1000 - 1; - for (int i = 0; i < threadCount; i++) { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - long currentTime1 = currentTime; - while (currentTime1 <= endTime) { - if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { - successCount.incrementAndGet(); - } - - try { - TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20)); - } catch (InterruptedException e) { - e.printStackTrace(); - } - currentTime1 = TimeUtil.currentTimeMillis(); - } - - waitLatch1.countDown(); - } - - }); - t.setName("sentinel-simulate-traffic-task-" + i); - t.start(); - } - waitLatch1.await(); - - assertEquals(successCount.get(), threshold); - } - } - - @Before - public void setUp() throws Exception { - ParameterMetricStorage.getMetricsMap().clear(); - } - - @After - public void tearDown() throws Exception { - ParameterMetricStorage.getMetricsMap().clear(); - } -} +/* + * Copyright 1999-2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.flow.param; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; +import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; +import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper; +import com.alibaba.csp.sentinel.block.flow.param.AbstractTimeBasedTest; +import com.alibaba.csp.sentinel.util.TimeUtil; +import org.mockito.MockedStatic; + +/** + * @author jialiang.linjl + * @author Eric Zhao + */ +public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest { + + @Test + public void testCheckQpsWithLongIntervalAndHighThreshold() { + try (MockedStatic mocked = super.mockTimeUtil()) { + // This test case is intended to avoid number overflow. + final String resourceName = "testCheckQpsWithLongIntervalAndHighThreshold"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + // Set a large threshold. + long threshold = 25000L; + + ParamFlowRule rule = new ParamFlowRule(resourceName) + .setCount(threshold) + .setParamIdx(paramIdx); + + String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000)); + + // We mock the time directly to avoid unstable behaviour. + setCurrentMillis(mocked, System.currentTimeMillis()); + + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + // 24 hours passed. + // This can make `toAddCount` larger that Integer.MAX_VALUE. + sleep(mocked, 1000 * 60 * 60 * 24); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + // 48 hours passed. + sleep(mocked, 1000 * 60 * 60 * 48); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + } + } + + @Test + public void testParamFlowDefaultCheckSingleQps() { + try (MockedStatic mocked = super.mockTimeUtil()) { + final String resourceName = "testParamFlowDefaultCheckSingleQps"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + long threshold = 5L; + + ParamFlowRule rule = new ParamFlowRule(); + rule.setResource(resourceName); + rule.setCount(threshold); + rule.setParamIdx(paramIdx); + + String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000)); + + // We mock the time directly to avoid unstable behaviour. + setCurrentMillis(mocked, System.currentTimeMillis()); + + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(mocked, 3000); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + } + } + + @Test + public void testParamFlowDefaultCheckSingleQpsWithBurst() throws InterruptedException { + try (MockedStatic mocked = super.mockTimeUtil()) { + final String resourceName = "testParamFlowDefaultCheckSingleQpsWithBurst"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + long threshold = 5L; + + ParamFlowRule rule = new ParamFlowRule(); + rule.setResource(resourceName); + rule.setCount(threshold); + rule.setParamIdx(paramIdx); + rule.setBurstCount(3); + + String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000)); + + // We mock the time directly to avoid unstable behaviour. + setCurrentMillis(mocked, System.currentTimeMillis()); + + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(mocked, 1002); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(mocked, 1002); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(mocked, 2000); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(mocked, 1002); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + } + } + + @Test + public void testParamFlowDefaultCheckQpsInDifferentDuration() throws InterruptedException { + try (MockedStatic mocked = super.mockTimeUtil()) { + final String resourceName = "testParamFlowDefaultCheckQpsInDifferentDuration"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + long threshold = 5L; + + ParamFlowRule rule = new ParamFlowRule(); + rule.setResource(resourceName); + rule.setCount(threshold); + rule.setParamIdx(paramIdx); + rule.setDurationInSec(60); + + String valueA = "helloWorld"; + ParameterMetric metric = new ParameterMetric(); + ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000)); + + // We mock the time directly to avoid unstable behaviour. + setCurrentMillis(mocked, System.currentTimeMillis()); + + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleepSecond(mocked, 1); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleepSecond(mocked, 10); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleepSecond(mocked, 30); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleepSecond(mocked, 30); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + } + } + + @Test + public void testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads() throws Exception { + final String resourceName = "testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + long threshold = 5L; + + final ParamFlowRule rule = new ParamFlowRule(); + rule.setResource(resourceName); + rule.setCount(threshold); + rule.setParamIdx(paramIdx); + rule.setDurationInSec(3); + + final String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000)); + int threadCount = 40; + + final CountDownLatch waitLatch = new CountDownLatch(threadCount); + final AtomicInteger successCount = new AtomicInteger(); + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(() -> { + if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { + successCount.incrementAndGet(); + } + waitLatch.countDown(); + }); + t.setName("sentinel-simulate-traffic-task-" + i); + t.start(); + } + waitLatch.await(); + + assertEquals(threshold, successCount.get()); + successCount.set(0); + + System.out.println("testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads: sleep for 3 seconds"); + TimeUnit.SECONDS.sleep(3); + + successCount.set(0); + final CountDownLatch waitLatch1 = new CountDownLatch(threadCount); + final long currentTime = TimeUtil.currentTimeMillis(); + final long endTime = currentTime + rule.getDurationInSec() * 1000 - 500; + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(() -> { + while (TimeUtil.currentTimeMillis() <= endTime) { + if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { + successCount.incrementAndGet(); + } + + try { + TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + waitLatch1.countDown(); + }); + t.setName("sentinel-simulate-traffic-task-" + i); + t.start(); + } + waitLatch1.await(); + + assertEquals(threshold, successCount.get()); + } + + @Before + public void setUp() throws Exception { + ParameterMetricStorage.getMetricsMap().clear(); + } + + @After + public void tearDown() throws Exception { + ParameterMetricStorage.getMetricsMap().clear(); + } +} diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java index aadef74a..7bec58df 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java @@ -24,8 +24,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.Collections; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; @@ -100,9 +100,9 @@ public class ParamFlowSlotTest { ParameterMetric metric = mock(ParameterMetric.class); CacheMap map = new ConcurrentLinkedHashMapWrapper<>(4000); - CacheMap map2 = new ConcurrentLinkedHashMapWrapper<>(4000); + CacheMap> map2 = new ConcurrentLinkedHashMapWrapper<>(4000); when(metric.getRuleTimeCounter(rule)).thenReturn(map); - when(metric.getRuleTokenCounter(rule)).thenReturn(map2); + when(metric.getRuleStampedTokenCounter(rule)).thenReturn(map2); map.put(argToGo, new AtomicLong(TimeUtil.currentTimeMillis())); // Insert the mock metric to control pass or block.