From 044cdbb1bf83886e5e3bb241a4dc42fe4a9ebc4c Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Thu, 14 Mar 2019 10:04:56 +0800 Subject: [PATCH] Add occupy mechanism for future buckets of sliding window to support "prioritized requests final pass" (#568) * Rename: MetricsLeapArray -> BucketLeapArray * Add implementation for `FutureBucketLeapArray`, a kind of `BucketLeapArray` that only reserves for future buckets, which is used for calculating occupied future tokens. * Add OccupiableBucketLeapArray that combines common BucketLeapArray with FutureBucketLeapArray. The rollingNumberInSecond in StatisticNode now uses OccupiableBucketLeapArray by default. * Add OccupySupport interface. Node now implements OccupySupport interface. * Add occupy-related methods in Metric and ArrayMetric. * Handle prioritized requests in default traffic shaping controller. * Update default occupyTimeout to 500ms Signed-off-by: Eric Zhao --- .../metric/ClusterMetricLeapArray.java | 2 +- .../metric/ClusterParameterLeapArray.java | 3 +- .../com/alibaba/csp/sentinel/node/Node.java | 2 +- .../csp/sentinel/node/OccupySupport.java | 70 ++++++++++ .../sentinel/node/OccupyTimeoutProperty.java | 79 +++++++++++ .../csp/sentinel/node/StatisticNode.java | 75 +++++++++-- .../csp/sentinel/node/metric/MetricNode.java | 52 ++++--- .../slots/block/flow/FlowRuleChecker.java | 2 +- .../block/flow/PriorityWaitException.java | 40 ++++++ .../flow/controller/DefaultController.java | 23 +++- .../sentinel/slots/statistic/MetricEvent.java | 8 +- .../slots/statistic/StatisticSlot.java | 16 +++ .../slots/statistic/base/LeapArray.java | 95 ++++++++++--- .../slots/statistic/base/UnaryLeapArray.java | 2 +- .../slots/statistic/base/WindowWrap.java | 11 ++ .../slots/statistic/data/MetricBucket.java | 22 +++ .../slots/statistic/metric/ArrayMetric.java | 119 +++++++++++++--- ...icsLeapArray.java => BucketLeapArray.java} | 10 +- .../slots/statistic/metric/Metric.java | 76 +++++++++-- .../metric/occupy/FutureBucketLeapArray.java | 53 ++++++++ .../occupy/OccupiableBucketLeapArray.java | 101 ++++++++++++++ .../csp/sentinel/node/StatisticNodeTest.java | 10 +- .../slots/statistic/base/LeapArrayTest.java | 2 +- .../statistic}/metric/ArrayMetricTest.java | 16 +-- .../metric/BucketLeapArrayTest.java} | 39 +++--- .../metric/FutureBucketLeapArrayTest.java | 32 +++++ .../metric/OccupiableBucketLeapArrayTest.java | 127 ++++++++++++++++++ .../metric/HotParameterLeapArray.java | 2 +- 28 files changed, 957 insertions(+), 132 deletions(-) create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupySupport.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupyTimeoutProperty.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/PriorityWaitException.java rename sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/{MetricsLeapArray.java => BucketLeapArray.java} (88%) create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/FutureBucketLeapArray.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/OccupiableBucketLeapArray.java rename sentinel-core/src/test/java/com/alibaba/csp/sentinel/{base => slots/statistic}/metric/ArrayMetricTest.java (88%) mode change 100755 => 100644 rename sentinel-core/src/test/java/com/alibaba/csp/sentinel/{base/metric/MetricsLeapArrayTest.java => slots/statistic/metric/BucketLeapArrayTest.java} (85%) mode change 100755 => 100644 create mode 100644 sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/FutureBucketLeapArrayTest.java create mode 100644 sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/OccupiableBucketLeapArrayTest.java diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java index da76b032..15bc2b88 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java @@ -40,7 +40,7 @@ public class ClusterMetricLeapArray extends LeapArray { } @Override - public ClusterMetricBucket newEmptyBucket() { + public ClusterMetricBucket newEmptyBucket(long timeMillis) { return new ClusterMetricBucket(); } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java index 138baa64..2b5fabc4 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java @@ -37,7 +37,7 @@ public class ClusterParameterLeapArray extends LeapArray> } @Override - public CacheMap newEmptyBucket() { + public CacheMap newEmptyBucket(long timeMillis) { return new ConcurrentLinkedHashMapWrapper<>(maxCapacity); } @@ -48,5 +48,4 @@ public class ClusterParameterLeapArray extends LeapArray> return w; } - } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java index d87eefdb..06739154 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java @@ -28,7 +28,7 @@ import com.alibaba.csp.sentinel.slots.statistic.metric.DebugSupport; * @author leyou * @author Eric Zhao */ -public interface Node extends DebugSupport { +public interface Node extends OccupySupport, DebugSupport { /** * Get incoming request per minute ({@code pass + block}). diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupySupport.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupySupport.java new file mode 100644 index 00000000..801d4eb8 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupySupport.java @@ -0,0 +1,70 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.node; + +/** + * @author Eric Zhao + * @since 1.5.0 + */ +public interface OccupySupport { + + /** + * Try to occupy latter time windows' tokens. If occupy success, a value less than + * {@code occupyTimeout} in {@link OccupyTimeoutProperty} will be return. + * + *

+ * Each time we occupy tokens of the future window, current thread should sleep for the + * corresponding time for smoothing QPS. We can't occupy tokens of the future with unlimited, + * the sleep time limit is {@code occupyTimeout} in {@link OccupyTimeoutProperty}. + *

+ * + * @param currentTime current time millis. + * @param acquireCount tokens count to acquire. + * @param threshold qps threshold. + * @return time should sleep. Time >= {@code occupyTimeout} in {@link OccupyTimeoutProperty} means + * occupy fail, in this case, the request should be rejected immediately. + */ + long tryOccupyNext(long currentTime, int acquireCount, double threshold); + + /** + * Get current waiting amount. Useful for debug. + * + * @return current waiting amount + */ + long waiting(); + + /** + * Add request that occupied. + * + * @param futureTime future timestamp that the acquireCount should be added on. + * @param acquireCount tokens count. + */ + void addWaitingRequest(long futureTime, int acquireCount); + + /** + * Add occupied pass request, which represents pass requests that borrow the latter windows' token. + * + * @param acquireCount tokens count. + */ + void addOccupiedPass(int acquireCount); + + /** + * Get current occupied pass QPS. + * + * @return current occupied pass QPS + */ + double occupiedPassQps(); +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupyTimeoutProperty.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupyTimeoutProperty.java new file mode 100644 index 00000000..a2d7ecf3 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupyTimeoutProperty.java @@ -0,0 +1,79 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.node; + +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.property.SentinelProperty; +import com.alibaba.csp.sentinel.property.SimplePropertyListener; + +/** + * @author jialiang.linjl + * @author Carpenter Lee + * @since 1.5.0 + */ +public class OccupyTimeoutProperty { + + /** + *

+ * Max occupy timeout in milliseconds. Requests with priority can occupy tokens of the future statistic + * window, and {@code occupyTimeout} limit the max time length that can be occupied. + *

+ *

+ * Note that the timeout value should never be greeter than {@link IntervalProperty#INTERVAL}. + *

+ * DO NOT MODIFY this value directly, use {@link #updateTimeout(int)}, + * otherwise the modification will not take effect. + */ + private static volatile int occupyTimeout = 500; + + public static void register2Property(SentinelProperty property) { + property.addListener(new SimplePropertyListener() { + @Override + public void configUpdate(Integer value) { + if (value != null) { + updateTimeout(value); + } + } + }); + } + + public static int getOccupyTimeout() { + return occupyTimeout; + } + + /** + * Update the timeout value.
+ * Note that the time out should never greeter than {@link IntervalProperty#INTERVAL}, + * or it will be ignored. + * + * @param newInterval new value. + */ + public static void updateTimeout(int newInterval) { + if (newInterval < 0) { + RecordLog.warn("[OccupyTimeoutProperty] Illegal timeout value will be ignored: " + occupyTimeout); + return; + } + if (newInterval > IntervalProperty.INTERVAL) { + RecordLog.warn("[OccupyTimeoutProperty] Illegal timeout value will be ignored: " + occupyTimeout + + ", should <= " + IntervalProperty.INTERVAL); + return; + } + if (newInterval != occupyTimeout) { + occupyTimeout = newInterval; + } + RecordLog.info("[OccupyTimeoutProperty] occupyTimeout updated to: " + occupyTimeout); + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java index 9f6cd336..44d2dc76 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java @@ -99,7 +99,7 @@ public class StatisticNode implements Node { * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds, * meaning each bucket per second, in this way we can get accurate statistics of each second. */ - private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000); + private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); /** * The counter for thread count. @@ -116,7 +116,7 @@ public class StatisticNode implements Node { // The fetch operation is thread-safe under a single-thread scheduler pool. long currentTime = TimeUtil.currentTimeMillis(); currentTime = currentTime - currentTime % 1000; - Map metrics = new ConcurrentHashMap(); + Map metrics = new ConcurrentHashMap<>(); List nodesOfEverySecond = rollingCounterInMinute.details(); long newLastFetchTime = lastFetchTime; // Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date). @@ -137,7 +137,7 @@ public class StatisticNode implements Node { private boolean isValidMetricNode(MetricNode node) { return node.getPassQps() > 0 || node.getBlockQps() > 0 || node.getSuccessQps() > 0 - || node.getExceptionQps() > 0 || node.getRt() > 0; + || node.getExceptionQps() > 0 || node.getRt() > 0 || node.getOccupiedPassQps() > 0; } @Override @@ -151,11 +151,6 @@ public class StatisticNode implements Node { return totalRequest; } - @Override - public long totalPass() { - return rollingCounterInMinute.pass(); - } - @Override public long blockRequest() { return rollingCounterInMinute.block(); @@ -201,6 +196,11 @@ public class StatisticNode implements Node { return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec(); } + @Override + public long totalPass() { + return rollingCounterInMinute.pass(); + } + @Override public double successQps() { return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec(); @@ -211,6 +211,11 @@ public class StatisticNode implements Node { return rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount(); } + @Override + public double occupiedPassQps() { + return rollingCounterInSecond.occupiedPass() / rollingCounterInSecond.getWindowIntervalInSec(); + } + @Override public double avgRt() { long successCount = rollingCounterInSecond.success(); @@ -256,7 +261,6 @@ public class StatisticNode implements Node { public void increaseExceptionQps(int count) { rollingCounterInSecond.addException(count); rollingCounterInMinute.addException(count); - } @Override @@ -271,6 +275,57 @@ public class StatisticNode implements Node { @Override public void debug() { - rollingCounterInSecond.debugQps(); + rollingCounterInSecond.debug(); + } + + @Override + public long tryOccupyNext(long currentTime, int acquireCount, double threshold) { + double maxCount = threshold * IntervalProperty.INTERVAL / 1000; + long currentBorrow = rollingCounterInSecond.waiting(); + if (currentBorrow >= maxCount) { + return OccupyTimeoutProperty.getOccupyTimeout(); + } + + int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT; + long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL; + + int idx = 0; + /* + * Note: here {@code currentPass} may be less than it really is NOW, because time difference + * since call rollingCounterInSecond.pass(). So in high concurrency, the following code may + * lead more tokens be borrowed. + */ + long currentPass = rollingCounterInSecond.pass(); + while (earliestTime < currentTime) { + long waitInMs = idx * windowLength + windowLength - currentTime % windowLength; + if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) { + break; + } + long windowPass = rollingCounterInSecond.getWindowPass(earliestTime); + if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) { + return waitInMs; + } + earliestTime += windowLength; + currentPass -= windowPass; + idx++; + } + + return OccupyTimeoutProperty.getOccupyTimeout(); + } + + @Override + public long waiting() { + return rollingCounterInSecond.waiting(); + } + + @Override + public void addWaitingRequest(long futureTime, int acquireCount) { + rollingCounterInSecond.addWaiting(futureTime, acquireCount); + } + + @Override + public void addOccupiedPass(int acquireCount) { + rollingCounterInMinute.addOccupiedPass(acquireCount); + rollingCounterInMinute.addPass(acquireCount); } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/metric/MetricNode.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/metric/MetricNode.java index 3f6970ee..e1a6707c 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/metric/MetricNode.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/metric/MetricNode.java @@ -34,12 +34,25 @@ public class MetricNode { private long exceptionQps; private long rt; + /** + * @since 1.5.0 + */ + private long occupiedPassQps; + private String resource; public long getTimestamp() { return timestamp; } + public long getOccupiedPassQps() { + return occupiedPassQps; + } + + public void setOccupiedPassQps(long occupiedPassQps) { + this.occupiedPassQps = occupiedPassQps; + } + public void setTimestamp(long timestamp) { this.timestamp = timestamp; } @@ -94,22 +107,17 @@ public class MetricNode { @Override public String toString() { - return "MetricNode{" + - "timestamp=" + timestamp + - ", passQps=" + passQps + - ", blockQps=" + blockQps + - ", successQps=" + successQps + - ", exceptionQps=" + exceptionQps + - ", rt=" + rt + - ", resource='" + resource + '\'' + - '}'; + return "MetricNode{" + "timestamp=" + timestamp + ", passQps=" + passQps + ", blockQps=" + blockQps + + ", successQps=" + successQps + ", exceptionQps=" + exceptionQps + ", rt=" + rt + + ", occupiedPassQps=" + occupiedPassQps + ", resource='" + + resource + '\'' + '}'; } /** - * To formatting string. All "|" in {@link #resource} will be replaced with "_", format is: - *
+ * To formatting string. All "|" in {@link #resource} will be replaced with + * "_", format is:
* - * timestamp|resource|passQps|blockQps|successQps|exceptionQps|rt + * timestamp|resource|passQps|blockQps|successQps|exceptionQps|rt|occupiedPassQps * * * @return string format of this. @@ -123,12 +131,13 @@ public class MetricNode { sb.append(blockQps).append("|"); sb.append(successQps).append("|"); sb.append(exceptionQps).append("|"); - sb.append(rt); + sb.append(rt).append("|"); + sb.append(occupiedPassQps); return sb.toString(); } /** - * Parse {@link MetricNode} from thin string, see {@link #toThinString()} ()} + * Parse {@link MetricNode} from thin string, see {@link #toThinString()} * * @param line * @return @@ -143,14 +152,17 @@ public class MetricNode { node.setSuccessQps(Long.parseLong(strs[4])); node.setExceptionQps(Long.parseLong(strs[5])); node.setRt(Long.parseLong(strs[6])); + if (strs.length == 8) { + node.setOccupiedPassQps(Long.parseLong(strs[7])); + } return node; } /** - * To formatting string. All "|" in {@link MetricNode#resource} will be replaced with "_", format is: - *
+ * To formatting string. All "|" in {@link MetricNode#resource} will be + * replaced with "_", format is:
* - * timestamp|yyyy-MM-dd HH:mm:ss|resource|passQps|blockQps|successQps|exceptionQps|rt\n + * timestamp|yyyy-MM-dd HH:mm:ss|resource|passQps|blockQps|successQps|exceptionQps|rt|occupiedPassQps\n * * * @return string format of this. @@ -167,7 +179,8 @@ public class MetricNode { sb.append(getBlockQps()).append("|"); sb.append(getSuccessQps()).append("|"); sb.append(getExceptionQps()).append("|"); - sb.append(getRt()); + sb.append(getRt()).append("|"); + sb.append(getOccupiedPassQps()); sb.append('\n'); return sb.toString(); } @@ -189,6 +202,9 @@ public class MetricNode { node.setSuccessQps(Long.parseLong(strs[5])); node.setExceptionQps(Long.parseLong(strs[6])); node.setRt(Long.parseLong(strs[7])); + if (strs.length == 9) { + node.setOccupiedPassQps(Long.parseLong(strs[8])); + } return node; } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java index 7d552ddb..4799bfb3 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java @@ -61,7 +61,7 @@ final class FlowRuleChecker { return true; } - return rule.getRater().canPass(selectedNode, acquireCount); + return rule.getRater().canPass(selectedNode, acquireCount, prioritized); } static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/PriorityWaitException.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/PriorityWaitException.java new file mode 100644 index 00000000..aee13fae --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/PriorityWaitException.java @@ -0,0 +1,40 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.flow; + +/** + * An exception that marks previous prioritized request has been waiting till now, then should pass. + * + * @author jialiang.linjl + * @since 1.5.0 + */ +public class PriorityWaitException extends RuntimeException { + + private final long waitInMs; + + public PriorityWaitException(long waitInMs) { + this.waitInMs = waitInMs; + } + + public long getWaitInMs() { + return waitInMs; + } + + @Override + public Throwable fillInStackTrace() { + return this; + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java index f2aef64c..e1847611 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java @@ -16,13 +16,17 @@ package com.alibaba.csp.sentinel.slots.block.flow.controller; import com.alibaba.csp.sentinel.node.Node; +import com.alibaba.csp.sentinel.node.OccupyTimeoutProperty; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.PriorityWaitException; import com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController; +import com.alibaba.csp.sentinel.util.TimeUtil; /** * Default throttling controller (immediately reject strategy). * * @author jialiang.linjl + * @author Eric Zhao */ public class DefaultController implements TrafficShapingController { @@ -45,9 +49,22 @@ public class DefaultController implements TrafficShapingController { public boolean canPass(Node node, int acquireCount, boolean prioritized) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { + if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { + long currentTime; + long waitInMs; + currentTime = TimeUtil.currentTimeMillis(); + waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); + if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { + node.addWaitingRequest(currentTime + waitInMs, acquireCount); + node.addOccupiedPass(acquireCount); + sleep(waitInMs); + + // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. + throw new PriorityWaitException(waitInMs); + } + } return false; } - return true; } @@ -55,10 +72,10 @@ public class DefaultController implements TrafficShapingController { if (node == null) { return DEFAULT_AVG_USED_TOKENS; } - return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int) node.passQps(); + return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); } - private void sleep(int timeMillis) { + private void sleep(long timeMillis) { try { Thread.sleep(timeMillis); } catch (InterruptedException e) { diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java index f05dc966..73dbd272 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java @@ -31,7 +31,9 @@ public enum MetricEvent { EXCEPTION, SUCCESS, RT, - OCCUPIED_PASS, - OCCUPIED_BLOCK, - WAITING + + /** + * Passed in future quota (pre-occupied, since 1.5.0). + */ + OCCUPIED_PASS } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java index 3f706790..d35a8bdd 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java @@ -19,6 +19,7 @@ import java.util.Collection; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback; +import com.alibaba.csp.sentinel.slots.block.flow.PriorityWaitException; import com.alibaba.csp.sentinel.util.TimeUtil; import com.alibaba.csp.sentinel.Constants; import com.alibaba.csp.sentinel.EntryType; @@ -70,6 +71,21 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot { Constants.ENTRY_NODE.addPassRequest(count); } + // Handle pass event with registered entry callback handlers. + for (ProcessorSlotEntryCallback handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { + handler.onPass(context, resourceWrapper, node, count, args); + } + } catch (PriorityWaitException ex) { + node.increaseThreadNum(); + if (context.getCurEntry().getOriginNode() != null) { + // Add count for origin node. + context.getCurEntry().getOriginNode().increaseThreadNum(); + } + + if (resourceWrapper.getType() == EntryType.IN) { + // Add count for global inbound entry node for global statistics. + Constants.ENTRY_NODE.increaseThreadNum(); + } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java index 2b089028..58a0dce0 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java @@ -28,7 +28,7 @@ import com.alibaba.csp.sentinel.util.TimeUtil; * Basic data structure for statistic metrics in Sentinel. *

*

- * Leap array use sliding window algorithm to count data. Each bucket cover {code windowLengthInMs} time span, + * Leap array use sliding window algorithm to count data. Each bucket cover {@code windowLengthInMs} time span, * and the total time span is {@link #intervalInMs}, so the total bucket amount is: * {@code sampleCount = intervalInMs / windowLengthInMs}. *

@@ -54,8 +54,8 @@ public abstract class LeapArray { /** * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}. * - * @param sampleCount bucket count of the sliding window - * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds + * @param sampleCount bucket count of the sliding window + * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds */ public LeapArray(int sampleCount, int intervalInMs) { AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount); @@ -66,7 +66,7 @@ public abstract class LeapArray { this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; - this.array = new AtomicReferenceArray>(sampleCount); + this.array = new AtomicReferenceArray<>(sampleCount); } /** @@ -81,9 +81,10 @@ public abstract class LeapArray { /** * Create a new statistic value for bucket. * + * @param timeMillis current time in milliseconds * @return the new empty bucket */ - public abstract T newEmptyBucket(); + public abstract T newEmptyBucket(long timeMillis); /** * Reset given bucket to provided start time and reset the value. @@ -94,7 +95,7 @@ public abstract class LeapArray { */ protected abstract WindowWrap resetWindowTo(WindowWrap windowWrap, long startTime); - protected int calculateTimeIdx(/*@Valid*/ long timeMillis) { + private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); @@ -141,7 +142,7 @@ public abstract class LeapArray { * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ - WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket()); + WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; @@ -193,7 +194,7 @@ public abstract class LeapArray { } } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. - return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket()); + return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } @@ -236,21 +237,22 @@ public abstract class LeapArray { /** * Get statistic value from bucket for provided timestamp. * - * @param time a valid timestamp in milliseconds + * @param timeMillis a valid timestamp in milliseconds * @return the statistic value if bucket for provided timestamp is up-to-date; otherwise null */ - public T getWindowValue(long time) { - if (time < 0) { + public T getWindowValue(long timeMillis) { + if (timeMillis < 0) { return null; } - int idx = calculateTimeIdx(time); + int idx = calculateTimeIdx(timeMillis); - WindowWrap old = array.get(idx); - if (old == null || isWindowDeprecated(old)) { + WindowWrap bucket = array.get(idx); + + if (bucket == null || !bucket.isTimeInWindow(timeMillis)) { return null; } - return old.value(); + return bucket.value(); } /** @@ -260,8 +262,12 @@ public abstract class LeapArray { * @param windowWrap a non-null bucket * @return true if the bucket is deprecated; otherwise false */ - protected boolean isWindowDeprecated(/*@NonNull*/ WindowWrap windowWrap) { - return TimeUtil.currentTimeMillis() - windowWrap.windowStart() >= intervalInMs; + public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap windowWrap) { + return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap); + } + + public boolean isWindowDeprecated(long time, WindowWrap windowWrap) { + return time - windowWrap.windowStart() > intervalInMs; } /** @@ -271,12 +277,36 @@ public abstract class LeapArray { * @return valid bucket list for entire sliding window. */ public List> list() { + return list(TimeUtil.currentTimeMillis()); + } + + public List> list(long validTime) { int size = array.length(); List> result = new ArrayList>(size); for (int i = 0; i < size; i++) { WindowWrap windowWrap = array.get(i); - if (windowWrap == null || isWindowDeprecated(windowWrap)) { + if (windowWrap == null || isWindowDeprecated(validTime, windowWrap)) { + continue; + } + result.add(windowWrap); + } + + return result; + } + + /** + * Get all buckets for entire sliding window including deprecated buckets. + * + * @return all buckets for entire sliding window + */ + public List> listAll() { + int size = array.length(); + List> result = new ArrayList>(size); + + for (int i = 0; i < size; i++) { + WindowWrap windowWrap = array.get(i); + if (windowWrap == null) { continue; } result.add(windowWrap); @@ -292,12 +322,19 @@ public abstract class LeapArray { * @return aggregated value list for entire sliding window */ public List values() { + return values(TimeUtil.currentTimeMillis()); + } + + public List values(long timeMillis) { + if (timeMillis < 0) { + return new ArrayList(); + } int size = array.length(); List result = new ArrayList(size); for (int i = 0; i < size; i++) { WindowWrap windowWrap = array.get(i); - if (windowWrap == null || isWindowDeprecated(windowWrap)) { + if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) { continue; } result.add(windowWrap.value()); @@ -359,4 +396,24 @@ public abstract class LeapArray { public double getIntervalInSecond() { return intervalInMs / 1000.0; } + + public void debug(long time) { + StringBuilder sb = new StringBuilder(); + List> lists = list(time); + sb.append("Thread_").append(Thread.currentThread().getId()).append("_"); + for (WindowWrap window : lists) { + sb.append(window.windowStart()).append(":").append(window.value().toString()); + } + System.out.println(sb.toString()); + } + + public long currentWaiting() { + // TODO: default method. Should remove this later. + return 0; + } + + public void addWaiting(long time, int acquireCount) { + // Do nothing by default. + throw new UnsupportedOperationException(); + } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java index 01ab8cf3..2d92dd81 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java @@ -25,7 +25,7 @@ public class UnaryLeapArray extends LeapArray { } @Override - public LongAdder newEmptyBucket() { + public LongAdder newEmptyBucket(long time) { return new LongAdder(); } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/WindowWrap.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/WindowWrap.java index 8edb2277..a020159a 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/WindowWrap.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/WindowWrap.java @@ -77,6 +77,17 @@ public class WindowWrap { return this; } + /** + * Check whether given timestamp is in current bucket. + * + * @param timeMillis valid timestamp in ms + * @return true if the given time is in current bucket, otherwise false + * @since 1.5.0 + */ + public boolean isTimeInWindow(long timeMillis) { + return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs; + } + @Override public String toString() { return "WindowWrap{" + diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java index ab1dc796..ec3f2afa 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java @@ -40,6 +40,15 @@ public class MetricBucket { initMinRt(); } + public MetricBucket reset(MetricBucket bucket) { + for (MetricEvent event : MetricEvent.values()) { + counters[event.ordinal()].reset(); + counters[event.ordinal()].add(bucket.get(event)); + } + initMinRt(); + return this; + } + private void initMinRt() { this.minRt = Constants.TIME_DROP_VALVE; } @@ -70,6 +79,10 @@ public class MetricBucket { return get(MetricEvent.PASS); } + public long occupiedPass() { + return get(MetricEvent.OCCUPIED_PASS); + } + public long block() { return get(MetricEvent.BLOCK); } @@ -94,6 +107,10 @@ public class MetricBucket { add(MetricEvent.PASS, n); } + public void addOccupiedPass(int n) { + add(MetricEvent.OCCUPIED_PASS, n); + } + public void addException(int n) { add(MetricEvent.EXCEPTION, n); } @@ -114,4 +131,9 @@ public class MetricBucket { minRt = rt; } } + + @Override + public String toString() { + return "p: " + pass() + ", b: " + block() + ", w: " + occupiedPass(); + } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java index cadec6ea..f4c2a073 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java @@ -20,27 +20,38 @@ import java.util.List; import com.alibaba.csp.sentinel.Constants; import com.alibaba.csp.sentinel.node.metric.MetricNode; +import com.alibaba.csp.sentinel.slots.statistic.MetricEvent; +import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; +import com.alibaba.csp.sentinel.slots.statistic.metric.occupy.OccupiableBucketLeapArray; /** - * The basic metric class in Sentinel using a {@link MetricsLeapArray} internal. + * The basic metric class in Sentinel using a {@link BucketLeapArray} internal. * * @author jialiang.linjl * @author Eric Zhao */ public class ArrayMetric implements Metric { - private final MetricsLeapArray data; + private final LeapArray data; public ArrayMetric(int sampleCount, int intervalInMs) { - this.data = new MetricsLeapArray(sampleCount, intervalInMs); + this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); + } + + public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) { + if (enableOccupy) { + this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); + } else { + this.data = new BucketLeapArray(sampleCount, intervalInMs); + } } /** * For unit test. */ - public ArrayMetric(MetricsLeapArray array) { + public ArrayMetric(LeapArray array) { this.data = array; } @@ -104,6 +115,17 @@ public class ArrayMetric implements Metric { return pass; } + @Override + public long occupiedPass() { + data.currentWindow(); + long pass = 0; + List list = data.values(); + for (MetricBucket window : list) { + pass += window.occupiedPass(); + } + return pass; + } + @Override public long rt() { data.currentWindow(); @@ -133,7 +155,8 @@ public class ArrayMetric implements Metric { public List details() { List details = new ArrayList(); data.currentWindow(); - for (WindowWrap window : data.list()) { + List> list = data.list(); + for (WindowWrap window : list) { if (window == null) { continue; } @@ -141,14 +164,16 @@ public class ArrayMetric implements Metric { node.setBlockQps(window.value().block()); node.setExceptionQps(window.value().exception()); node.setPassQps(window.value().pass()); - long passQps = window.value().success(); - node.setSuccessQps(passQps); - if (passQps != 0) { - node.setRt(window.value().rt() / passQps); + long successQps = window.value().success(); + node.setSuccessQps(successQps); + if (successQps != 0) { + node.setRt(window.value().rt() / successQps); } else { node.setRt(window.value().rt()); } node.setTimestamp(window.windowStart()); + node.setOccupiedPassQps(window.value().occupiedPass()); + details.add(node); } @@ -158,7 +183,7 @@ public class ArrayMetric implements Metric { @Override public MetricBucket[] windows() { data.currentWindow(); - return data.values().toArray(new MetricBucket[data.values().size()]); + return data.values().toArray(new MetricBucket[0]); } @Override @@ -173,6 +198,17 @@ public class ArrayMetric implements Metric { wrap.value().addBlock(count); } + @Override + public void addWaiting(long time, int acquireCount) { + data.addWaiting(time, acquireCount); + } + + @Override + public void addOccupiedPass(int acquireCount) { + WindowWrap wrap = data.currentWindow(); + wrap.value().addOccupiedPass(acquireCount); + } + @Override public void addSuccess(int count) { WindowWrap wrap = data.currentWindow(); @@ -192,18 +228,8 @@ public class ArrayMetric implements Metric { } @Override - public void debugQps() { - data.currentWindow(); - StringBuilder sb = new StringBuilder(); - sb.append(Thread.currentThread().getId()).append("_"); - for (WindowWrap windowWrap : data.list()) { - - sb.append(windowWrap.windowStart()).append(":").append(windowWrap.value().pass()).append(":") - .append(windowWrap.value().block()); - sb.append(","); - - } - System.out.println(sb); + public void debug() { + data.debug(System.currentTimeMillis()); } @Override @@ -226,6 +252,55 @@ public class ArrayMetric implements Metric { return wrap.value().pass(); } + public void add(MetricEvent event, long count) { + data.currentWindow().value().add(event, count); + } + + public long getCurrentCount(MetricEvent event) { + return data.currentWindow().value().get(event); + } + + /** + * Get total sum for provided event in {@code intervalInSec}. + * + * @param event event to calculate + * @return total sum for event + */ + public long getSum(MetricEvent event) { + data.currentWindow(); + long sum = 0; + + List buckets = data.values(); + for (MetricBucket bucket : buckets) { + sum += bucket.get(event); + } + return sum; + } + + /** + * Get average count for provided event per second. + * + * @param event event to calculate + * @return average count per second for event + */ + public double getAvg(MetricEvent event) { + return getSum(event) / data.getIntervalInSecond(); + } + + @Override + public long getWindowPass(long timeMillis) { + MetricBucket bucket = data.getWindowValue(timeMillis); + if (bucket == null) { + return 0L; + } + return bucket.pass(); + } + + @Override + public long waiting() { + return data.currentWaiting(); + } + @Override public double getWindowIntervalInSec() { return data.getIntervalInSecond(); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/MetricsLeapArray.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArray.java similarity index 88% rename from sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/MetricsLeapArray.java rename to sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArray.java index 1cee0dd6..5f165cc3 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/MetricsLeapArray.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArray.java @@ -16,24 +16,24 @@ package com.alibaba.csp.sentinel.slots.statistic.metric; import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; -import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; +import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; /** * The fundamental data structure for metric statistics in a time span. * - * @see LeapArray * @author jialiang.linjl * @author Eric Zhao + * @see LeapArray */ -public class MetricsLeapArray extends LeapArray { +public class BucketLeapArray extends LeapArray { - public MetricsLeapArray(int sampleCount, int intervalInMs) { + public BucketLeapArray(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); } @Override - public MetricBucket newEmptyBucket() { + public MetricBucket newEmptyBucket(long time) { return new MetricBucket(); } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java index a3fb933b..f6a73f5f 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java @@ -26,7 +26,7 @@ import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; * @author jialiang.linjl * @author Eric Zhao */ -public interface Metric { +public interface Metric extends DebugSupport { /** * Get total success count. @@ -57,7 +57,7 @@ public interface Metric { long block(); /** - * Get total pass count. + * Get total pass count. not include {@link #occupiedPass()} * * @return pass count */ @@ -92,22 +92,30 @@ public interface Metric { MetricBucket[] windows(); /** - * Increment by one the current exception count. + * Add current exception count. + * + * @param n count to add */ void addException(int n); /** - * Increment by one the current block count. + * Add current block count. + * + * @param n count to add */ void addBlock(int n); /** - * Increment by one the current success count. + * Add current completed count. + * + * @param n count to add */ void addSuccess(int n); /** - * Increment by one the current pass count. + * Add current pass count. + * + * @param n count to add */ void addPass(int n); @@ -118,13 +126,65 @@ public interface Metric { */ void addRT(long rt); + /** + * Get the sliding window length in seconds. + * + * @return the sliding window length + */ double getWindowIntervalInSec(); + /** + * Get sample count of the sliding window. + * + * @return sample count of the sliding window. + */ int getSampleCount(); - // Tool methods. + /** + * Note: this operation will not perform refreshing, so will not generate new buckets. + * + * @param timeMillis valid time in ms + * @return pass count of the bucket exactly associated to provided timestamp, or 0 if the timestamp is invalid + * @since 1.5.0 + */ + long getWindowPass(long timeMillis); - void debugQps(); + // Occupy-based (@since 1.5.0) + + /** + * Add occupied pass, which represents pass requests that borrow the latter windows' token. + * + * @param acquireCount tokens count. + * @since 1.5.0 + */ + void addOccupiedPass(int acquireCount); + + /** + * Add request that occupied. + * + * @param futureTime future timestamp that the acquireCount should be added on. + * @param acquireCount tokens count. + * @since 1.5.0 + */ + void addWaiting(long futureTime, int acquireCount); + + /** + * Get waiting pass account + * + * @return waiting pass count + * @since 1.5.0 + */ + long waiting(); + + /** + * Get occupied pass count. + * + * @return occupied pass count + * @since 1.5.0 + */ + long occupiedPass(); + + // Tool methods. long previousWindowBlock(); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/FutureBucketLeapArray.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/FutureBucketLeapArray.java new file mode 100644 index 00000000..13d20c97 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/FutureBucketLeapArray.java @@ -0,0 +1,53 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.statistic.metric.occupy; + +import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; +import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; +import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; + +/** + * A kind of {@code BucketLeapArray} that only reserves for future buckets. + * + * @author jialiang.linjl + * @since 1.5.0 + */ +public class FutureBucketLeapArray extends LeapArray { + + public FutureBucketLeapArray(int sampleCount, int intervalInMs) { + // This class is the original "BorrowBucketArray". + super(sampleCount, intervalInMs); + } + + @Override + public MetricBucket newEmptyBucket(long time) { + return new MetricBucket(); + } + + @Override + protected WindowWrap resetWindowTo(WindowWrap w, long startTime) { + // Update the start time and reset value. + w.resetTo(startTime); + w.value().reset(); + return w; + } + + @Override + public boolean isWindowDeprecated(long time, WindowWrap windowWrap) { + // Tricky: will only calculate for future. + return time >= windowWrap.windowStart(); + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/OccupiableBucketLeapArray.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/OccupiableBucketLeapArray.java new file mode 100644 index 00000000..b3650992 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/OccupiableBucketLeapArray.java @@ -0,0 +1,101 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.statistic.metric.occupy; + +import java.util.List; + +import com.alibaba.csp.sentinel.slots.statistic.MetricEvent; +import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; +import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; +import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; + +/** + * @author jialiang.linjl + * @since 1.5.0 + */ +public class OccupiableBucketLeapArray extends LeapArray { + + private final FutureBucketLeapArray borrowArray; + + public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) { + // This class is the original "CombinedBucketArray". + super(sampleCount, intervalInMs); + this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs); + } + + @Override + public MetricBucket newEmptyBucket(long time) { + MetricBucket newBucket = new MetricBucket(); + + MetricBucket borrowBucket = borrowArray.getWindowValue(time); + if (borrowBucket != null) { + newBucket.reset(borrowBucket); + } + + return newBucket; + } + + @Override + protected WindowWrap resetWindowTo(WindowWrap w, long time) { + // Update the start time and reset value. + w.resetTo(time); + MetricBucket borrowBucket = borrowArray.getWindowValue(time); + if (borrowBucket != null) { + w.value().reset(); + w.value().addPass((int)borrowBucket.pass()); + } else { + w.value().reset(); + } + + return w; + } + + @Override + public long currentWaiting() { + borrowArray.currentWindow(); + long currentWaiting = 0; + List list = borrowArray.values(); + + for (MetricBucket window : list) { + currentWaiting += window.pass(); + } + return currentWaiting; + } + + @Override + public void addWaiting(long time, int acquireCount) { + WindowWrap window = borrowArray.currentWindow(time); + window.value().add(MetricEvent.PASS, acquireCount); + } + + @Override + public void debug(long time) { + StringBuilder sb = new StringBuilder(); + List> lists = listAll(); + sb.append("a_Thread_").append(Thread.currentThread().getId()).append(" time=").append(time).append("; "); + for (WindowWrap window : lists) { + sb.append(window.windowStart()).append(":").append(window.value().toString()).append(";"); + } + sb.append("\n"); + + lists = borrowArray.listAll(); + sb.append("b_Thread_").append(Thread.currentThread().getId()).append(" time=").append(time).append("; "); + for (WindowWrap window : lists) { + sb.append(window.windowStart()).append(":").append(window.value().toString()).append(";"); + } + System.out.println(sb.toString()); + } +} diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/node/StatisticNodeTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/node/StatisticNodeTest.java index 02724342..44674dd1 100644 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/node/StatisticNodeTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/node/StatisticNodeTest.java @@ -39,7 +39,7 @@ import static org.junit.Assert.assertTrue; */ public class StatisticNodeTest { - private static final String LOG_PREFIX = "[StatisticNodeTest]"; + private static final String LOG_PREFIX = "[StatisticNodeTest] "; private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-HH-dd HH:mm:ss"); @@ -74,7 +74,7 @@ public class StatisticNodeTest { tickEs.submit(new TickTask(node)); - List bizTasks = new ArrayList(taskBizExecuteCount); + List bizTasks = new ArrayList<>(taskBizExecuteCount); for (int i = 0; i < taskCount; i++) { bizTasks.add(new BizTask(node, taskBizExecuteCount)); } @@ -88,8 +88,8 @@ public class StatisticNodeTest { log("all biz task done, waiting 3 second to exit"); sleep(3000); - bizEs.shutdown(); - tickEs.shutdown(); + bizEs.shutdownNow(); + tickEs.shutdownNow(); // now no biz method execute, so there is no curThreadNum,passQps,successQps assertEquals(0, node.curThreadNum(), 0.01); @@ -192,7 +192,7 @@ public class StatisticNodeTest { log(SDF.format(new Date()) + " curThreadNum=" + node.curThreadNum() + ",passQps=" + node.passQps() + ",successQps=" + node.successQps() + ",maxSuccessQps=" + node.maxSuccessQps() + ",totalRequest=" + node.totalRequest() + ",totalSuccess=" + node.totalSuccess() - + ",avgRt=" + node.avgRt() + ",minRt=" + node.minRt()); + + ", avgRt=" + String.format("%.2f", node.avgRt()) + ", minRt=" + node.minRt()); } private static void log(Object obj) { diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArrayTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArrayTest.java index ebe821c0..b3ac551b 100644 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArrayTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArrayTest.java @@ -35,7 +35,7 @@ public class LeapArrayTest extends AbstractTimeBasedTest { int sampleCount = intervalInMs / windowLengthInMs; LeapArray leapArray = new LeapArray(sampleCount, intervalInMs) { @Override - public AtomicInteger newEmptyBucket() { + public AtomicInteger newEmptyBucket(long time) { return new AtomicInteger(0); } diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/base/metric/ArrayMetricTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetricTest.java old mode 100755 new mode 100644 similarity index 88% rename from sentinel-core/src/test/java/com/alibaba/csp/sentinel/base/metric/ArrayMetricTest.java rename to sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetricTest.java index ee2cd5f7..80293d48 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/base/metric/ArrayMetricTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetricTest.java @@ -13,20 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.csp.sentinel.base.metric; +package com.alibaba.csp.sentinel.slots.statistic.metric; import java.util.ArrayList; +import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; +import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; + import org.junit.Test; -import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; -import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; -import com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric; -import com.alibaba.csp.sentinel.slots.statistic.metric.MetricsLeapArray; - import static org.junit.Assert.*; - -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Test cases for {@link ArrayMetric}. @@ -39,7 +37,7 @@ public class ArrayMetricTest { @Test public void testOperateArrayMetric() { - MetricsLeapArray leapArray = mock(MetricsLeapArray.class); + BucketLeapArray leapArray = mock(BucketLeapArray.class); final WindowWrap windowWrap = new WindowWrap(windowLengthInMs, 0, new MetricBucket()); when(leapArray.currentWindow()).thenReturn(windowWrap); when(leapArray.values()).thenReturn(new ArrayList() {{ add(windowWrap.value()); }}); diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/base/metric/MetricsLeapArrayTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArrayTest.java old mode 100755 new mode 100644 similarity index 85% rename from sentinel-core/src/test/java/com/alibaba/csp/sentinel/base/metric/MetricsLeapArrayTest.java rename to sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArrayTest.java index 18cf7e6f..ec1c696f --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/base/metric/MetricsLeapArrayTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArrayTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.csp.sentinel.base.metric; +package com.alibaba.csp.sentinel.slots.statistic.metric; import java.util.ArrayList; import java.util.HashSet; @@ -21,22 +21,20 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; +import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; import com.alibaba.csp.sentinel.util.TimeUtil; -import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; -import com.alibaba.csp.sentinel.slots.statistic.metric.MetricsLeapArray; -import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest; import org.junit.Test; import static org.junit.Assert.*; /** - * Test cases for {@link MetricsLeapArray}. + * Test cases for {@link BucketLeapArray}. * * @author Eric Zhao */ -public class MetricsLeapArrayTest extends AbstractTimeBasedTest { +public class BucketLeapArrayTest { private final int windowLengthInMs = 1000; private final int intervalInSec = 2; @@ -45,7 +43,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { @Test public void testNewWindow() { - MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs); + BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs); long time = TimeUtil.currentTimeMillis(); WindowWrap window = leapArray.currentWindow(time); @@ -57,7 +55,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { @Test public void testLeapArrayWindowStart() { - MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs); + BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs); long firstTime = TimeUtil.currentTimeMillis(); long previousWindowStart = firstTime - firstTime % windowLengthInMs; @@ -69,7 +67,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { @Test public void testWindowAfterOneInterval() { - MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs); + BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs); long firstTime = TimeUtil.currentTimeMillis(); long previousWindowStart = firstTime - firstTime % windowLengthInMs; WindowWrap window = leapArray.currentWindow(previousWindowStart); @@ -109,7 +107,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { @Deprecated public void testWindowDeprecatedRefresh() { - MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs); + BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs); final int len = sampleCount; long firstTime = TimeUtil.currentTimeMillis(); List> firstIterWindowList = new ArrayList>(len); @@ -129,7 +127,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { public void testMultiThreadUpdateEmptyWindow() throws Exception { final long time = TimeUtil.currentTimeMillis(); final int nThreads = 16; - final MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs); + final BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs); final CountDownLatch latch = new CountDownLatch(nThreads); Runnable task = new Runnable() { @Override @@ -150,8 +148,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { @Test public void testGetPreviousWindow() { - setCurrentMillis(System.currentTimeMillis()); - MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs); + BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs); long time = TimeUtil.currentTimeMillis(); WindowWrap previousWindow = leapArray.currentWindow(time); assertNull(leapArray.getPreviousWindow(time)); @@ -168,10 +165,8 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { final int windowLengthInMs = 100; final int intervalInMs = 1000; final int sampleCount = intervalInMs / windowLengthInMs; - - setCurrentMillis(System.currentTimeMillis()); - - MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs); + + BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs); long time = TimeUtil.currentTimeMillis(); Set> windowWraps = new HashSet>(); @@ -184,7 +179,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { assertTrue(windowWraps.contains(wrap)); } - sleep(windowLengthInMs + intervalInMs); + Thread.sleep(windowLengthInMs + intervalInMs); // This will replace the deprecated bucket, so all deprecated buckets will be reset. leapArray.currentWindow(time + windowLengthInMs + intervalInMs).value().addPass(1); @@ -198,8 +193,8 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { final int intervalInSec = 1; final int intervalInMs = intervalInSec * 1000; final int sampleCount = intervalInMs / windowLengthInMs; - - MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs); + + BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs); long time = TimeUtil.currentTimeMillis(); Set> windowWraps = new HashSet>(); @@ -207,7 +202,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { windowWraps.add(leapArray.currentWindow(time)); windowWraps.add(leapArray.currentWindow(time + windowLengthInMs)); - sleep(intervalInMs + windowLengthInMs * 3); + Thread.sleep(intervalInMs + windowLengthInMs * 3); List> list = leapArray.list(); for (WindowWrap wrap : list) { @@ -220,4 +215,4 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest { assertEquals(1, leapArray.list().size()); } -} \ No newline at end of file +} diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/FutureBucketLeapArrayTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/FutureBucketLeapArrayTest.java new file mode 100644 index 00000000..42720504 --- /dev/null +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/FutureBucketLeapArrayTest.java @@ -0,0 +1,32 @@ +package com.alibaba.csp.sentinel.slots.statistic.metric; + +import com.alibaba.csp.sentinel.slots.statistic.metric.occupy.FutureBucketLeapArray; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test cases for {@link FutureBucketLeapArray}. + * + * @author jialiang.linjl + */ +public class FutureBucketLeapArrayTest { + + private final int windowLengthInMs = 200; + private final int intervalInSec = 2; + private final int intervalInMs = intervalInSec * 1000; + private final int sampleCount = intervalInMs / windowLengthInMs; + + @Test + public void testFutureMetricLeapArray() { + FutureBucketLeapArray array = new FutureBucketLeapArray(sampleCount, intervalInMs); + + long currentTime = TimeUtil.currentTimeMillis(); + for (int i = 0; i < intervalInSec * 1000; i = i + windowLengthInMs) { + array.currentWindow(i + currentTime).value().addPass(1); + assertEquals(array.values(i + currentTime).size(), 0); + } + } +} diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/OccupiableBucketLeapArrayTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/OccupiableBucketLeapArrayTest.java new file mode 100644 index 00000000..5c6cdbd8 --- /dev/null +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/OccupiableBucketLeapArrayTest.java @@ -0,0 +1,127 @@ +package com.alibaba.csp.sentinel.slots.statistic.metric; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; +import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; +import com.alibaba.csp.sentinel.slots.statistic.metric.occupy.OccupiableBucketLeapArray; +import com.alibaba.csp.sentinel.util.TimeUtil; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test cases for {@link OccupiableBucketLeapArray}. + * + * @author jialiang.linjl + */ +public class OccupiableBucketLeapArrayTest { + + private final int windowLengthInMs = 200; + private final int intervalInSec = 2; + private final int intervalInMs = intervalInSec * 1000; + private final int sampleCount = intervalInMs / windowLengthInMs; + + @Test + public void testNewWindow() { + long currentTime = TimeUtil.currentTimeMillis(); + OccupiableBucketLeapArray leapArray = new OccupiableBucketLeapArray(sampleCount, intervalInMs); + + WindowWrap currentWindow = leapArray.currentWindow(currentTime); + currentWindow.value().addPass(1); + assertEquals(currentWindow.value().pass(), 1L); + + leapArray.addWaiting(currentTime + windowLengthInMs, 1); + assertEquals(leapArray.currentWaiting(), 1); + assertEquals(currentWindow.value().pass(), 1L); + + } + + @Test + public void testWindowInOneInterval() { + OccupiableBucketLeapArray leapArray = new OccupiableBucketLeapArray(sampleCount, intervalInMs); + long currentTime = TimeUtil.currentTimeMillis(); + + WindowWrap currentWindow = leapArray.currentWindow(currentTime); + currentWindow.value().addPass(1); + assertEquals(currentWindow.value().pass(), 1L); + + leapArray.addWaiting(currentTime + windowLengthInMs, 2); + assertEquals(leapArray.currentWaiting(), 2); + assertEquals(currentWindow.value().pass(), 1L); + + leapArray.currentWindow(currentTime + windowLengthInMs); + List values = leapArray.values(currentTime + windowLengthInMs); + assertEquals(values.size(), 2); + + long sum = 0; + for (MetricBucket bucket : values) { + sum += bucket.pass(); + } + assertEquals(sum, 3); + } + + @Test + public void testMultiThreadUpdateEmptyWindow() throws Exception { + final long time = TimeUtil.currentTimeMillis(); + final int nThreads = 16; + final OccupiableBucketLeapArray leapArray = new OccupiableBucketLeapArray(sampleCount, intervalInMs); + final CountDownLatch latch = new CountDownLatch(nThreads); + Runnable task = new Runnable() { + @Override + public void run() { + leapArray.currentWindow(time).value().addPass(1); + leapArray.addWaiting(time + windowLengthInMs, 1); + latch.countDown(); + } + }; + + for (int i = 0; i < nThreads; i++) { + new Thread(task).start(); + } + + latch.await(); + + assertEquals(nThreads, leapArray.currentWindow(time).value().pass()); + assertEquals(nThreads, leapArray.currentWaiting()); + + leapArray.currentWindow(time + windowLengthInMs); + long sum = 0; + List values = leapArray.values(time + windowLengthInMs); + for (MetricBucket bucket : values) { + sum += bucket.pass(); + } + assertEquals(values.size(), 2); + assertEquals(sum, nThreads * 2); + } + + @Test + public void testWindowAfterOneInterval() { + OccupiableBucketLeapArray leapArray = new OccupiableBucketLeapArray(sampleCount, intervalInMs); + long currentTime = TimeUtil.currentTimeMillis(); + + System.out.println(currentTime); + for (int i = 0; i < intervalInSec * 1000 / windowLengthInMs; i++) { + WindowWrap currentWindow = leapArray.currentWindow(currentTime + i * windowLengthInMs); + currentWindow.value().addPass(1); + leapArray.addWaiting(currentTime + (i + 1) * windowLengthInMs, 1); + System.out.println(currentTime + i * windowLengthInMs); + leapArray.debug(currentTime + i * windowLengthInMs); + } + + System.out.println(currentTime + intervalInSec * 1000); + List values = leapArray + .values(currentTime - currentTime % windowLengthInMs + intervalInSec * 1000); + leapArray.debug(currentTime + intervalInSec * 1000); + assertEquals(values.size(), intervalInSec * 1000 / windowLengthInMs); + + long sum = 0; + for (MetricBucket bucket : values) { + sum += bucket.pass(); + } + assertEquals(sum, 2 * intervalInSec * 1000 / windowLengthInMs - 1); + assertEquals(leapArray.currentWaiting(), 10); + } +} diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArray.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArray.java index b1fc7d94..164ae117 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArray.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArray.java @@ -42,7 +42,7 @@ public class HotParameterLeapArray extends LeapArray { } @Override - public ParamMapBucket newEmptyBucket() { + public ParamMapBucket newEmptyBucket(long timeMillis) { return new ParamMapBucket(); }