fix flaky tests (#3364)
* fix ParamFlowPartialIntegrationTest mock TimeUtil to avoid time changed, and bucket moved to next. * remove csp.sentinel.log.output.type remove sysprop csp.sentinel.log.output.type in TimeUtilTest, to avoid a few outputs. * try fix testConcurrentAcquireAndRelease * remove stdout remove sout in ParamFlowThrottleRateLimitingCheckerTest, to avoid a few outputs. * ConcurrentClusterFlowCheckerTest,djust the flowId to prevent conflicts * CurrentConcurrencyManagerTest,adjust the flowId to prevent conflicts * update year
This commit is contained in:
parent
24c93c8a1a
commit
befdc56885
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
* Copyright 1999-2024 Alibaba Group Holding Ltd.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -22,6 +22,7 @@ import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurr
|
||||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager;
|
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager;
|
||||||
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
|
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
|
||||||
import com.alibaba.csp.sentinel.cluster.server.AbstractTimeBasedTest;
|
import com.alibaba.csp.sentinel.cluster.server.AbstractTimeBasedTest;
|
||||||
|
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
|
||||||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
|
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
|
||||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
|
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
|
||||||
import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig;
|
import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig;
|
||||||
|
|
@ -47,7 +48,7 @@ public class ConcurrentClusterFlowCheckerTest extends AbstractTimeBasedTest {
|
||||||
ClusterFlowConfig config = new ClusterFlowConfig();
|
ClusterFlowConfig config = new ClusterFlowConfig();
|
||||||
config.setResourceTimeout(500);
|
config.setResourceTimeout(500);
|
||||||
config.setClientOfflineTime(1000);
|
config.setClientOfflineTime(1000);
|
||||||
config.setFlowId(111L);
|
config.setFlowId(179L);
|
||||||
config.setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL);
|
config.setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL);
|
||||||
rule.setClusterConfig(config);
|
rule.setClusterConfig(config);
|
||||||
rule.setClusterMode(true);
|
rule.setClusterMode(true);
|
||||||
|
|
@ -64,7 +65,7 @@ public class ConcurrentClusterFlowCheckerTest extends AbstractTimeBasedTest {
|
||||||
public void testEasyAcquireAndRelease() throws InterruptedException {
|
public void testEasyAcquireAndRelease() throws InterruptedException {
|
||||||
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
|
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
|
||||||
setCurrentMillis(mocked, System.currentTimeMillis());
|
setCurrentMillis(mocked, System.currentTimeMillis());
|
||||||
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L);
|
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(179L);
|
||||||
ArrayList<Long> list = new ArrayList<>();
|
ArrayList<Long> list = new ArrayList<>();
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
|
TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
|
||||||
|
|
@ -83,7 +84,7 @@ public class ConcurrentClusterFlowCheckerTest extends AbstractTimeBasedTest {
|
||||||
result.getStatus() == TokenResultStatus.RELEASE_OK);
|
result.getStatus() == TokenResultStatus.RELEASE_OK);
|
||||||
}
|
}
|
||||||
Assert.assertTrue("fail to release token",
|
Assert.assertTrue("fail to release token",
|
||||||
CurrentConcurrencyManager.get(111L).get() == 0 && TokenCacheNodeManager.getSize() == 0);
|
CurrentConcurrencyManager.get(179L).get() == 0 && TokenCacheNodeManager.getSize() == 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -91,22 +92,22 @@ public class ConcurrentClusterFlowCheckerTest extends AbstractTimeBasedTest {
|
||||||
public void testConcurrentAcquireAndRelease() throws InterruptedException {
|
public void testConcurrentAcquireAndRelease() throws InterruptedException {
|
||||||
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
|
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
|
||||||
setCurrentMillis(mocked, System.currentTimeMillis());
|
setCurrentMillis(mocked, System.currentTimeMillis());
|
||||||
final FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L);
|
final FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(179L);
|
||||||
final CountDownLatch countDownLatch = new CountDownLatch(1000);
|
final CountDownLatch countDownLatch = new CountDownLatch(1000);
|
||||||
ExecutorService pool = Executors.newFixedThreadPool(100);
|
ExecutorService pool = Executors.newFixedThreadPool(100,
|
||||||
|
new NamedThreadFactory("ConcurrentClusterFlowCheckerTest", true)
|
||||||
|
);
|
||||||
|
|
||||||
for (long i = 0; i < 1000; i++) {
|
for (long i = 0; i < 1000; i++) {
|
||||||
Runnable task = new Runnable() {
|
Runnable task = () -> {
|
||||||
@Override
|
Assert.assertNotNull(rule);
|
||||||
public void run() {
|
TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
|
||||||
assert rule != null;
|
String msg = String.format("concurrent control fail %s<%s", CurrentConcurrencyManager.get(179L).get(), rule.getCount());
|
||||||
TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
|
Assert.assertTrue(msg, CurrentConcurrencyManager.get(179L).get() <= rule.getCount());
|
||||||
Assert.assertTrue("concurrent control fail", CurrentConcurrencyManager.get(111L).get() <= rule.getCount());
|
if (result.getStatus() == TokenResultStatus.OK) {
|
||||||
if (result.getStatus() == TokenResultStatus.OK) {
|
ConcurrentClusterFlowChecker.releaseConcurrentToken(result.getTokenId());
|
||||||
ConcurrentClusterFlowChecker.releaseConcurrentToken(result.getTokenId());
|
|
||||||
}
|
|
||||||
countDownLatch.countDown();
|
|
||||||
}
|
}
|
||||||
|
countDownLatch.countDown();
|
||||||
};
|
};
|
||||||
pool.execute(task);
|
pool.execute(task);
|
||||||
}
|
}
|
||||||
|
|
@ -121,7 +122,7 @@ public class ConcurrentClusterFlowCheckerTest extends AbstractTimeBasedTest {
|
||||||
@Test
|
@Test
|
||||||
public void testReleaseExpiredToken() throws InterruptedException {
|
public void testReleaseExpiredToken() throws InterruptedException {
|
||||||
ConnectionManager.addConnection("test", "127.0.0.1");
|
ConnectionManager.addConnection("test", "127.0.0.1");
|
||||||
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L);
|
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(179L);
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
|
ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
* Copyright 1999-2024 Alibaba Group Holding Ltd.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -15,7 +15,7 @@
|
||||||
*/
|
*/
|
||||||
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;
|
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;
|
||||||
|
|
||||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
|
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
@ -26,24 +26,23 @@ import java.util.concurrent.Executors;
|
||||||
public class CurrentConcurrencyManagerTest {
|
public class CurrentConcurrencyManagerTest {
|
||||||
@Test
|
@Test
|
||||||
public void updateTest() throws InterruptedException {
|
public void updateTest() throws InterruptedException {
|
||||||
CurrentConcurrencyManager.put(111L, 0);
|
CurrentConcurrencyManager.put(113L, 0);
|
||||||
CurrentConcurrencyManager.put(222L, 0);
|
CurrentConcurrencyManager.put(223L, 0);
|
||||||
final CountDownLatch countDownLatch = new CountDownLatch(1000);
|
final CountDownLatch countDownLatch = new CountDownLatch(1000);
|
||||||
ExecutorService pool = Executors.newFixedThreadPool(100);
|
ExecutorService pool = Executors.newFixedThreadPool(100,
|
||||||
|
new NamedThreadFactory("CurrentConcurrencyManagerTest", true)
|
||||||
|
);
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
Runnable task = new Runnable() {
|
Runnable task = () -> {
|
||||||
@Override
|
CurrentConcurrencyManager.addConcurrency(113L, 1);
|
||||||
public void run() {
|
CurrentConcurrencyManager.addConcurrency(223L, 2);
|
||||||
CurrentConcurrencyManager.addConcurrency(111L, 1);
|
countDownLatch.countDown();
|
||||||
CurrentConcurrencyManager.addConcurrency(222L, 2);
|
|
||||||
countDownLatch.countDown();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
pool.execute(task);
|
pool.execute(task);
|
||||||
}
|
}
|
||||||
countDownLatch.await();
|
countDownLatch.await();
|
||||||
pool.shutdown();
|
pool.shutdown();
|
||||||
Assert.assertEquals(1000, CurrentConcurrencyManager.get(111L).get());
|
Assert.assertEquals(1000, CurrentConcurrencyManager.get(113L).get());
|
||||||
Assert.assertEquals(2000, CurrentConcurrencyManager.get(222L).get());
|
Assert.assertEquals(2000, CurrentConcurrencyManager.get(223L).get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
* Copyright 1999-2024 Alibaba Group Holding Ltd.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -32,10 +32,6 @@ import com.alibaba.csp.sentinel.util.function.Tuple2;
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class TimeUtilTest {
|
public class TimeUtilTest {
|
||||||
@Before
|
|
||||||
public void initLogging() {
|
|
||||||
System.setProperty("csp.sentinel.log.output.type", "console");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void waitFor(int step, int seconds) throws InterruptedException {
|
private void waitFor(int step, int seconds) throws InterruptedException {
|
||||||
for (int i = 0; i < seconds; i ++) {
|
for (int i = 0; i < seconds; i ++) {
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,13 @@ package com.alibaba.csp.sentinel.slots.block.flow.param;
|
||||||
import com.alibaba.csp.sentinel.Entry;
|
import com.alibaba.csp.sentinel.Entry;
|
||||||
import com.alibaba.csp.sentinel.EntryType;
|
import com.alibaba.csp.sentinel.EntryType;
|
||||||
import com.alibaba.csp.sentinel.SphU;
|
import com.alibaba.csp.sentinel.SphU;
|
||||||
|
import com.alibaba.csp.sentinel.block.flow.param.AbstractTimeBasedTest;
|
||||||
import com.alibaba.csp.sentinel.slots.block.BlockException;
|
import com.alibaba.csp.sentinel.slots.block.BlockException;
|
||||||
|
import com.alibaba.csp.sentinel.util.TimeUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.MockedStatic;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
@ -18,7 +21,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
* @author quguai
|
* @author quguai
|
||||||
* @date 2023/10/27 13:44
|
* @date 2023/10/27 13:44
|
||||||
*/
|
*/
|
||||||
public class ParamFlowPartialIntegrationTest {
|
public class ParamFlowPartialIntegrationTest extends AbstractTimeBasedTest {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
|
@ -32,22 +35,25 @@ public class ParamFlowPartialIntegrationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParamFlowRegex() {
|
public void testParamFlowRegex() {
|
||||||
ParamFlowRule rule = new ParamFlowRule(".*")
|
try (MockedStatic<TimeUtil> mocked = super.mockTimeUtil()) {
|
||||||
.setParamIdx(0)
|
setCurrentMillis(mocked, 1800000000000L);
|
||||||
.setCount(1);
|
ParamFlowRule rule = new ParamFlowRule(".*")
|
||||||
rule.setRegex(true);
|
.setParamIdx(0)
|
||||||
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
|
.setCount(1);
|
||||||
verifyFlow("testParamFlowRegex_1", true, "args");
|
rule.setRegex(true);
|
||||||
verifyFlow("testParamFlowRegex_1", true, "args_1");
|
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
|
||||||
|
verifyFlow("testParamFlowRegex_1", true, "args");
|
||||||
|
verifyFlow("testParamFlowRegex_1", true, "args_1");
|
||||||
|
|
||||||
verifyFlow("testParamFlowRegex_1", false, "args");
|
verifyFlow("testParamFlowRegex_1", false, "args");
|
||||||
verifyFlow("testParamFlowRegex_1", false, "args_1");
|
verifyFlow("testParamFlowRegex_1", false, "args_1");
|
||||||
|
|
||||||
verifyFlow("testParamFlowRegex_2", true, "args");
|
verifyFlow("testParamFlowRegex_2", true, "args");
|
||||||
verifyFlow("testParamFlowRegex_2", true, "args_1");
|
verifyFlow("testParamFlowRegex_2", true, "args_1");
|
||||||
|
|
||||||
verifyFlow("testParamFlowRegex_2", false, "args");
|
verifyFlow("testParamFlowRegex_2", false, "args");
|
||||||
verifyFlow("testParamFlowRegex_2", false, "args_1");
|
verifyFlow("testParamFlowRegex_2", false, "args_1");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 1999-2019 Alibaba Group Holding Ltd.
|
* Copyright 1999-2024 Alibaba Group Holding Ltd.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -70,7 +70,6 @@ public class ParamFlowThrottleRateLimitingCheckerTest {
|
||||||
}
|
}
|
||||||
assertEquals(successCount, threshold);
|
assertEquals(successCount, threshold);
|
||||||
|
|
||||||
System.out.println("testSingleValueThrottleCheckQps: sleep for 3 seconds");
|
|
||||||
TimeUnit.SECONDS.sleep(3);
|
TimeUnit.SECONDS.sleep(3);
|
||||||
|
|
||||||
currentTime = TimeUtil.currentTimeMillis();
|
currentTime = TimeUtil.currentTimeMillis();
|
||||||
|
|
@ -104,7 +103,6 @@ public class ParamFlowThrottleRateLimitingCheckerTest {
|
||||||
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
|
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
|
||||||
|
|
||||||
int threadCount = 40;
|
int threadCount = 40;
|
||||||
System.out.println(metric.getRuleTimeCounter(rule));
|
|
||||||
|
|
||||||
final CountDownLatch waitLatch = new CountDownLatch(threadCount);
|
final CountDownLatch waitLatch = new CountDownLatch(threadCount);
|
||||||
final AtomicInteger successCount = new AtomicInteger();
|
final AtomicInteger successCount = new AtomicInteger();
|
||||||
|
|
@ -125,10 +123,8 @@ public class ParamFlowThrottleRateLimitingCheckerTest {
|
||||||
waitLatch.await();
|
waitLatch.await();
|
||||||
|
|
||||||
assertEquals(successCount.get(), 1);
|
assertEquals(successCount.get(), 1);
|
||||||
System.out.println(threadCount);
|
|
||||||
successCount.set(0);
|
successCount.set(0);
|
||||||
|
|
||||||
System.out.println("testSingleValueThrottleCheckQpsMultipleThreads: sleep for 3 seconds");
|
|
||||||
TimeUnit.SECONDS.sleep(3);
|
TimeUnit.SECONDS.sleep(3);
|
||||||
|
|
||||||
successCount.set(0);
|
successCount.set(0);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue