> NAMESPACE_FLOW_ID_MAP = new ConcurrentHashMap<>();
+ /**
+ * This map (flowId, namespace) is used for getting connected count
+ * when checking a specific rule in {@code ruleId}:
+ *
+ *
+ * ruleId -> namespace -> connection group -> connected count
+ *
+ */
+ private static final Map FLOW_NAMESPACE_MAP = new ConcurrentHashMap<>();
+
+ /**
+ * (namespace, property-listener wrapper)
+ */
+ private static final Map> PROPERTY_MAP = new ConcurrentHashMap<>();
+ /**
+ * Cluster flow rule property supplier for a specific namespace.
+ */
+ private static volatile Function>> propertySupplier
+ = DEFAULT_PROPERTY_SUPPLIER;
+
+ private static final Object UPDATE_LOCK = new Object();
+
+ static {
+ initDefaultProperty();
+ }
+
+ private static void initDefaultProperty() {
+ // The server should always support default namespace,
+ // so register a default property for default namespace.
+ SentinelProperty> defaultProperty = new DynamicSentinelProperty<>();
+ String defaultNamespace = ServerConstants.DEFAULT_NAMESPACE;
+ registerPropertyInternal(defaultNamespace, defaultProperty);
+ }
+
+ public static void setPropertySupplier(Function>> propertySupplier) {
+ ClusterFlowRuleManager.propertySupplier = propertySupplier;
+ }
+
+ /**
+ * Listen to the {@link SentinelProperty} for cluster {@link FlowRule}s.
+ * The property is the source of cluster {@link FlowRule}s for a specific namespace.
+ *
+ * @param namespace namespace to register
+ */
+ public static void register2Property(String namespace) {
+ AssertUtil.notEmpty(namespace, "namespace cannot be empty");
+ if (propertySupplier == null) {
+ RecordLog.warn(
+ "[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property");
+ return;
+ }
+ SentinelProperty> property = propertySupplier.apply(namespace);
+ if (property == null) {
+ RecordLog.warn(
+ "[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring");
+ return;
+ }
+ synchronized (UPDATE_LOCK) {
+ RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager"
+ + " for namespace <{0}>", namespace);
+ registerPropertyInternal(namespace, property);
+ }
+ }
+
+ /**
+ * Listen to the {@link SentinelProperty} for cluster {@link FlowRule}s if current property for namespace is absent.
+ * The property is the source of cluster {@link FlowRule}s for a specific namespace.
+ *
+ * @param namespace namespace to register
+ */
+ public static void registerPropertyIfAbsent(String namespace) {
+ AssertUtil.notEmpty(namespace, "namespace cannot be empty");
+ if (!PROPERTY_MAP.containsKey(namespace)) {
+ synchronized (UPDATE_LOCK) {
+ if (!PROPERTY_MAP.containsKey(namespace)) {
+ register2Property(namespace);
+ }
+ }
+ }
+ }
+
+ private static void registerPropertyInternal(/*@NonNull*/ String namespace, /*@Valid*/
+ SentinelProperty> property) {
+ NamespaceFlowProperty oldProperty = PROPERTY_MAP.get(namespace);
+ if (oldProperty != null) {
+ oldProperty.getProperty().removeListener(oldProperty.getListener());
+ }
+ PropertyListener> listener = new FlowRulePropertyListener(namespace);
+ property.addListener(listener);
+ PROPERTY_MAP.put(namespace, new NamespaceFlowProperty<>(namespace, property, listener));
+ Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
+ if (flowIdSet == null) {
+ resetNamespaceFlowIdMapFor(namespace);
+ }
+ }
+
+ /**
+ * Remove cluster flow rule property for a specific namespace.
+ *
+ * @param namespace valid namespace
+ */
+ public static void removeProperty(String namespace) {
+ AssertUtil.notEmpty(namespace, "namespace cannot be empty");
+ synchronized (UPDATE_LOCK) {
+ NamespaceFlowProperty property = PROPERTY_MAP.get(namespace);
+ if (property != null) {
+ property.getProperty().removeListener(property.getListener());
+ PROPERTY_MAP.remove(namespace);
+ }
+ RecordLog.info("[ClusterFlowRuleManager] Removing property from cluster flow rule manager"
+ + " for namespace <{0}>", namespace);
+ }
+ }
+
+ private static void removePropertyListeners() {
+ for (NamespaceFlowProperty property : PROPERTY_MAP.values()) {
+ property.getProperty().removeListener(property.getListener());
+ }
+ }
+
+ private static void restorePropertyListeners() {
+ for (NamespaceFlowProperty p : PROPERTY_MAP.values()) {
+ p.getProperty().removeListener(p.getListener());
+ p.getProperty().addListener(p.getListener());
+ }
+ }
+
+ /**
+ * Get flow rule by rule ID.
+ *
+ * @param id rule ID
+ * @return flow rule
+ */
+ public static FlowRule getFlowRuleById(Long id) {
+ if (!ClusterRuleUtil.validId(id)) {
+ return null;
+ }
+ return FLOW_RULES.get(id);
+ }
+
+ public static List getAllFlowRules() {
+ return new ArrayList<>(FLOW_RULES.values());
+ }
+
+ /**
+ * Get all cluster flow rules within a specific namespace.
+ *
+ * @param namespace valid namespace
+ * @return cluster flow rules within the provided namespace
+ */
+ public static List getFlowRules(String namespace) {
+ if (StringUtil.isEmpty(namespace)) {
+ return new ArrayList<>();
+ }
+ List rules = new ArrayList<>();
+ Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
+ if (flowIdSet == null || flowIdSet.isEmpty()) {
+ return rules;
+ }
+ for (Long flowId : flowIdSet) {
+ FlowRule rule = FLOW_RULES.get(flowId);
+ if (rule != null) {
+ rules.add(rule);
+ }
+ }
+ return rules;
+ }
+
+ /**
+ * Load flow rules for a specific namespace. The former rules of the namespace will be replaced.
+ *
+ * @param namespace a valid namespace
+ * @param rules rule list
+ */
+ public static void loadRules(String namespace, List rules) {
+ AssertUtil.notEmpty(namespace, "namespace cannot be empty");
+ NamespaceFlowProperty property = PROPERTY_MAP.get(namespace);
+ if (property != null) {
+ property.getProperty().updateValue(rules);
+ }
+ }
+
+ private static void resetNamespaceFlowIdMapFor(/*@Valid*/ String namespace) {
+ NAMESPACE_FLOW_ID_MAP.put(namespace, new HashSet());
+ }
+
+ /**
+ * Clear all rules of the provided namespace and reset map.
+ *
+ * @param namespace valid namespace
+ */
+ private static void clearAndResetRulesFor(/*@Valid*/ String namespace) {
+ Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
+ if (flowIdSet != null && !flowIdSet.isEmpty()) {
+ for (Long flowId : flowIdSet) {
+ FLOW_RULES.remove(flowId);
+ FLOW_NAMESPACE_MAP.remove(flowId);
+ }
+ flowIdSet.clear();
+ } else {
+ resetNamespaceFlowIdMapFor(namespace);
+ }
+ }
+
+ private static void clearAndResetRulesConditional(/*@Valid*/ String namespace, Predicate predicate) {
+ Set oldIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
+ if (oldIdSet != null && !oldIdSet.isEmpty()) {
+ for (Long flowId : oldIdSet) {
+ if (predicate.test(flowId)) {
+ FLOW_RULES.remove(flowId);
+ FLOW_NAMESPACE_MAP.remove(flowId);
+ ClusterMetricStatistics.removeMetric(flowId);
+ }
+ }
+ oldIdSet.clear();
+ }
+ }
+
+ /**
+ * Get connected count for associated namespace of given {@code flowId}.
+ *
+ * @param flowId unique flow ID
+ * @return connected count
+ */
+ public static int getConnectedCount(long flowId) {
+ if (flowId <= 0) {
+ return 0;
+ }
+ String namespace = FLOW_NAMESPACE_MAP.get(flowId);
+ if (namespace == null) {
+ return 0;
+ }
+ return ConnectionManager.getConnectedCount(namespace);
+ }
+
+ private static void applyClusterFlowRule(List list, /*@Valid*/ String namespace) {
+ if (list == null || list.isEmpty()) {
+ clearAndResetRulesFor(namespace);
+ return;
+ }
+ final ConcurrentHashMap ruleMap = new ConcurrentHashMap<>();
+
+ Set flowIdSet = new HashSet<>();
+
+ for (FlowRule rule : list) {
+ if (!rule.isClusterMode()) {
+ continue;
+ }
+ if (!FlowRuleUtil.isValidRule(rule)) {
+ RecordLog.warn(
+ "[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
+ continue;
+ }
+ if (StringUtil.isBlank(rule.getLimitApp())) {
+ rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
+ }
+
+ // Flow id should not be null after filtered.
+ Long flowId = rule.getClusterConfig().getFlowId();
+ if (flowId == null) {
+ continue;
+ }
+ ruleMap.put(flowId, rule);
+ FLOW_NAMESPACE_MAP.put(flowId, namespace);
+ flowIdSet.add(flowId);
+
+ // Prepare cluster metric from valid flow ID.
+ ClusterMetricStatistics.putMetricIfAbsent(flowId,
+ new ClusterMetric(ClusterServerConfigManager.getSampleCount(),
+ ClusterServerConfigManager.getIntervalMs()));
+ }
+
+ // Cleanup unused cluster metrics.
+ clearAndResetRulesConditional(namespace, new Predicate() {
+ @Override
+ public boolean test(Long flowId) {
+ return !ruleMap.containsKey(flowId);
+ }
+ });
+
+ FLOW_RULES.putAll(ruleMap);
+ NAMESPACE_FLOW_ID_MAP.put(namespace, flowIdSet);
+ }
+
+ private static final class FlowRulePropertyListener implements PropertyListener> {
+
+ private final String namespace;
+
+ public FlowRulePropertyListener(String namespace) {
+ this.namespace = namespace;
+ }
+
+ @Override
+ public synchronized void configUpdate(List conf) {
+ applyClusterFlowRule(conf, namespace);
+ RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received for namespace <{0}>: {1}",
+ namespace, FLOW_RULES);
+ }
+
+ @Override
+ public synchronized void configLoad(List conf) {
+ applyClusterFlowRule(conf, namespace);
+ RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded for namespace <{0}>: {1}",
+ namespace, FLOW_RULES);
+ }
+ }
+
+ private ClusterFlowRuleManager() {}
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java
new file mode 100644
index 00000000..c81393d5
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java
@@ -0,0 +1,356 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.rule;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric;
+import com.alibaba.csp.sentinel.cluster.server.ServerConstants;
+import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager;
+import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
+import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil;
+import com.alibaba.csp.sentinel.log.RecordLog;
+import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
+import com.alibaba.csp.sentinel.property.PropertyListener;
+import com.alibaba.csp.sentinel.property.SentinelProperty;
+import com.alibaba.csp.sentinel.slots.block.RuleConstant;
+import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
+import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleUtil;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+import com.alibaba.csp.sentinel.util.StringUtil;
+import com.alibaba.csp.sentinel.util.function.Function;
+import com.alibaba.csp.sentinel.util.function.Predicate;
+
+/**
+ * Manager for cluster parameter flow rules.
+ *
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public final class ClusterParamFlowRuleManager {
+
+ /**
+ * The default cluster parameter flow rule property supplier that creates a new
+ * dynamic property for a specific namespace to manually do rule management.
+ */
+ public static final Function>> DEFAULT_PROPERTY_SUPPLIER =
+ new Function>>() {
+ @Override
+ public SentinelProperty> apply(String namespace) {
+ return new DynamicSentinelProperty<>();
+ }
+ };
+
+ /**
+ * (id, clusterParamRule)
+ */
+ private static final Map PARAM_RULES = new ConcurrentHashMap<>();
+ /**
+ * (namespace, [flowId...])
+ */
+ private static final Map> NAMESPACE_FLOW_ID_MAP = new ConcurrentHashMap<>();
+ /**
+ * (flowId, namespace)
+ */
+ private static final Map FLOW_NAMESPACE_MAP = new ConcurrentHashMap<>();
+
+ /**
+ * (namespace, property-listener wrapper)
+ */
+ private static final Map> PROPERTY_MAP = new ConcurrentHashMap<>();
+ /**
+ * Cluster parameter flow rule property supplier for a specific namespace.
+ */
+ private static volatile Function>> propertySupplier
+ = DEFAULT_PROPERTY_SUPPLIER;
+
+ private static final Object UPDATE_LOCK = new Object();
+
+ static {
+ initDefaultProperty();
+ }
+
+ private static void initDefaultProperty() {
+ SentinelProperty> defaultProperty = new DynamicSentinelProperty<>();
+ String defaultNamespace = ServerConstants.DEFAULT_NAMESPACE;
+ registerPropertyInternal(defaultNamespace, defaultProperty);
+ }
+
+ public static void setPropertySupplier(
+ Function>> propertySupplier) {
+ ClusterParamFlowRuleManager.propertySupplier = propertySupplier;
+ }
+
+ /**
+ * Listen to the {@link SentinelProperty} for cluster {@link ParamFlowRule}s.
+ * The property is the source of cluster {@link ParamFlowRule}s for a specific namespace.
+ *
+ * @param namespace namespace to register
+ */
+ public static void register2Property(String namespace) {
+ AssertUtil.notEmpty(namespace, "namespace cannot be empty");
+ if (propertySupplier == null) {
+ RecordLog.warn(
+ "[ClusterParamFlowRuleManager] Cluster param rule property supplier is absent, cannot register "
+ + "property");
+ return;
+ }
+ SentinelProperty> property = propertySupplier.apply(namespace);
+ if (property == null) {
+ RecordLog.warn(
+ "[ClusterParamFlowRuleManager] Wrong created property from cluster param rule property supplier, "
+ + "ignoring");
+ return;
+ }
+ synchronized (UPDATE_LOCK) {
+ RecordLog.info("[ClusterParamFlowRuleManager] Registering new property to cluster param rule manager"
+ + " for namespace <{0}>", namespace);
+ registerPropertyInternal(namespace, property);
+ }
+ }
+
+ public static void registerPropertyIfAbsent(String namespace) {
+ AssertUtil.notEmpty(namespace, "namespace cannot be empty");
+ if (!PROPERTY_MAP.containsKey(namespace)) {
+ synchronized (UPDATE_LOCK) {
+ if (!PROPERTY_MAP.containsKey(namespace)) {
+ register2Property(namespace);
+ }
+ }
+ }
+ }
+
+ private static void registerPropertyInternal(/*@NonNull*/ String namespace, /*@Valid*/
+ SentinelProperty> property) {
+ NamespaceFlowProperty oldProperty = PROPERTY_MAP.get(namespace);
+ if (oldProperty != null) {
+ oldProperty.getProperty().removeListener(oldProperty.getListener());
+ }
+ PropertyListener> listener = new ParamRulePropertyListener(namespace);
+ property.addListener(listener);
+ PROPERTY_MAP.put(namespace, new NamespaceFlowProperty<>(namespace, property, listener));
+ Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
+ if (flowIdSet == null) {
+ resetNamespaceFlowIdMapFor(namespace);
+ }
+ }
+
+ public static void removeProperty(String namespace) {
+ AssertUtil.notEmpty(namespace, "namespace cannot be empty");
+ synchronized (UPDATE_LOCK) {
+ NamespaceFlowProperty property = PROPERTY_MAP.get(namespace);
+ if (property != null) {
+ property.getProperty().removeListener(property.getListener());
+ PROPERTY_MAP.remove(namespace);
+ }
+ RecordLog.info("[ClusterParamFlowRuleManager] Removing property from cluster flow rule manager"
+ + " for namespace <{0}>", namespace);
+ }
+ }
+
+ private static void removePropertyListeners() {
+ for (NamespaceFlowProperty property : PROPERTY_MAP.values()) {
+ property.getProperty().removeListener(property.getListener());
+ }
+ }
+
+ private static void restorePropertyListeners() {
+ for (NamespaceFlowProperty p : PROPERTY_MAP.values()) {
+ p.getProperty().removeListener(p.getListener());
+ p.getProperty().addListener(p.getListener());
+ }
+ }
+
+ private static void resetNamespaceFlowIdMapFor(/*@Valid*/ String namespace) {
+ NAMESPACE_FLOW_ID_MAP.put(namespace, new HashSet());
+ }
+
+ private static void clearAndResetRulesFor(/*@Valid*/ String namespace) {
+ Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
+ if (flowIdSet != null && !flowIdSet.isEmpty()) {
+ for (Long flowId : flowIdSet) {
+ PARAM_RULES.remove(flowId);
+ FLOW_NAMESPACE_MAP.remove(flowId);
+ }
+ flowIdSet.clear();
+ } else {
+ resetNamespaceFlowIdMapFor(namespace);
+ }
+ }
+
+ private static void clearAndResetRulesConditional(/*@Valid*/ String namespace, Predicate predicate) {
+ Set oldIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
+ if (oldIdSet != null && !oldIdSet.isEmpty()) {
+ for (Long flowId : oldIdSet) {
+ if (predicate.test(flowId)) {
+ PARAM_RULES.remove(flowId);
+ FLOW_NAMESPACE_MAP.remove(flowId);
+ ClusterParamMetricStatistics.removeMetric(flowId);
+ }
+ }
+ oldIdSet.clear();
+ }
+ }
+
+ public static ParamFlowRule getParamRuleById(Long id) {
+ if (!ClusterRuleUtil.validId(id)) {
+ return null;
+ }
+ return PARAM_RULES.get(id);
+ }
+
+ public static List getAllParamRules() {
+ return new ArrayList<>(PARAM_RULES.values());
+ }
+
+ /**
+ * Get all cluster parameter flow rules within a specific namespace.
+ *
+ * @param namespace a valid namespace
+ * @return cluster parameter flow rules within the provided namespace
+ */
+ public static List getParamRules(String namespace) {
+ if (StringUtil.isEmpty(namespace)) {
+ return new ArrayList<>();
+ }
+ List rules = new ArrayList<>();
+ Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
+ if (flowIdSet == null || flowIdSet.isEmpty()) {
+ return rules;
+ }
+ for (Long flowId : flowIdSet) {
+ ParamFlowRule rule = PARAM_RULES.get(flowId);
+ if (rule != null) {
+ rules.add(rule);
+ }
+ }
+ return rules;
+ }
+
+ /**
+ * Load parameter flow rules for a specific namespace. The former rules of the namespace will be replaced.
+ *
+ * @param namespace a valid namespace
+ * @param rules rule list
+ */
+ public static void loadRules(String namespace, List rules) {
+ AssertUtil.notEmpty(namespace, "namespace cannot be empty");
+ NamespaceFlowProperty property = PROPERTY_MAP.get(namespace);
+ if (property != null) {
+ property.getProperty().updateValue(rules);
+ }
+ }
+
+ /**
+ * Get connected count for associated namespace of given {@code flowId}.
+ *
+ * @param flowId existing rule ID
+ * @return connected count
+ */
+ public static int getConnectedCount(long flowId) {
+ if (flowId <= 0) {
+ return 0;
+ }
+ String namespace = FLOW_NAMESPACE_MAP.get(flowId);
+ if (namespace == null) {
+ return 0;
+ }
+ return ConnectionManager.getConnectedCount(namespace);
+ }
+
+ private static class ParamRulePropertyListener implements PropertyListener> {
+
+ private final String namespace;
+
+ public ParamRulePropertyListener(String namespace) {
+ this.namespace = namespace;
+ }
+
+ @Override
+ public void configLoad(List conf) {
+ applyClusterParamRules(conf, namespace);
+ RecordLog.info("[ClusterParamFlowRuleManager] Cluster parameter rules loaded for namespace <{0}>: {1}",
+ namespace, PARAM_RULES);
+ }
+
+ @Override
+ public void configUpdate(List conf) {
+ applyClusterParamRules(conf, namespace);
+ RecordLog.info("[ClusterParamFlowRuleManager] Cluster parameter rules received for namespace <{0}>: {1}",
+ namespace, PARAM_RULES);
+ }
+ }
+
+ private static void applyClusterParamRules(List list, /*@Valid*/ String namespace) {
+ if (list == null || list.isEmpty()) {
+ clearAndResetRulesFor(namespace);
+ return;
+ }
+ final ConcurrentHashMap ruleMap = new ConcurrentHashMap<>();
+
+ Set flowIdSet = new HashSet<>();
+
+ for (ParamFlowRule rule : list) {
+ if (!rule.isClusterMode()) {
+ continue;
+ }
+ if (!ParamFlowRuleUtil.isValidRule(rule)) {
+ RecordLog.warn(
+ "[ClusterParamFlowRuleManager] Ignoring invalid param flow rule when loading new flow rules: "
+ + rule);
+ continue;
+ }
+ if (StringUtil.isBlank(rule.getLimitApp())) {
+ rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
+ }
+
+ ParamFlowRuleUtil.fillExceptionFlowItems(rule);
+
+ // Flow id should not be null after filtered.
+ Long flowId = rule.getClusterConfig().getFlowId();
+ if (flowId == null) {
+ continue;
+ }
+ ruleMap.put(flowId, rule);
+ FLOW_NAMESPACE_MAP.put(flowId, namespace);
+ flowIdSet.add(flowId);
+
+ // Prepare cluster parameter metric from valid rule ID.
+ ClusterParamMetricStatistics.putMetricIfAbsent(flowId,
+ new ClusterParamMetric(ClusterServerConfigManager.getSampleCount(),
+ ClusterServerConfigManager.getIntervalMs()));
+ }
+
+ // Cleanup unused cluster parameter metrics.
+ clearAndResetRulesConditional(namespace, new Predicate() {
+ @Override
+ public boolean test(Long flowId) {
+ return !ruleMap.containsKey(flowId);
+ }
+ });
+
+ PARAM_RULES.putAll(ruleMap);
+ NAMESPACE_FLOW_ID_MAP.put(namespace, flowIdSet);
+ }
+
+ private ClusterParamFlowRuleManager() {}
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/NamespaceFlowProperty.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/NamespaceFlowProperty.java
new file mode 100644
index 00000000..37732b43
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/NamespaceFlowProperty.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.rule;
+
+import java.util.List;
+
+import com.alibaba.csp.sentinel.property.PropertyListener;
+import com.alibaba.csp.sentinel.property.SentinelProperty;
+
+/**
+ * A property wrapper for list of rules of a given namespace.
+ * This is useful for auto-management of the property and listener.
+ *
+ * @param type of the rule
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+class NamespaceFlowProperty {
+
+ private final String namespace;
+ private final SentinelProperty> property;
+ private final PropertyListener> listener;
+
+ public NamespaceFlowProperty(String namespace,
+ SentinelProperty> property,
+ PropertyListener> listener) {
+ this.namespace = namespace;
+ this.property = property;
+ this.listener = listener;
+ }
+
+ public SentinelProperty> getProperty() {
+ return property;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public PropertyListener> getListener() {
+ return listener;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterMetricStatistics.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterMetricStatistics.java
new file mode 100644
index 00000000..e574f7dc
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterMetricStatistics.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric;
+import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+
+/**
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public final class ClusterMetricStatistics {
+
+ private static final Map METRIC_MAP = new ConcurrentHashMap<>();
+
+ public static void clear() {
+ METRIC_MAP.clear();
+ }
+
+ public static void putMetric(long id, ClusterMetric metric) {
+ AssertUtil.notNull(metric, "Cluster metric cannot be null");
+ METRIC_MAP.put(id, metric);
+ }
+
+ public static boolean putMetricIfAbsent(long id, ClusterMetric metric) {
+ AssertUtil.notNull(metric, "Cluster metric cannot be null");
+ if (METRIC_MAP.containsKey(id)) {
+ return false;
+ }
+ METRIC_MAP.put(id, metric);
+ return true;
+ }
+
+ public static void removeMetric(long id) {
+ METRIC_MAP.remove(id);
+ }
+
+ public static ClusterMetric getMetric(long id) {
+ return METRIC_MAP.get(id);
+ }
+
+ public static void resetFlowMetrics() {
+ Set keySet = METRIC_MAP.keySet();
+ for (Long id : keySet) {
+ METRIC_MAP.put(id, new ClusterMetric(ClusterServerConfigManager.getSampleCount(),
+ ClusterServerConfigManager.getIntervalMs()));
+ }
+ }
+
+ private ClusterMetricStatistics() {}
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterParamMetricStatistics.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterParamMetricStatistics.java
new file mode 100644
index 00000000..73632493
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterParamMetricStatistics.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric;
+import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+
+/**
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public final class ClusterParamMetricStatistics {
+
+ private static final Map METRIC_MAP = new ConcurrentHashMap<>();
+
+ public static void clear() {
+ METRIC_MAP.clear();
+ }
+
+ public static void putMetric(long id, ClusterParamMetric metric) {
+ AssertUtil.notNull(metric, "metric cannot be null");
+ METRIC_MAP.put(id, metric);
+ }
+
+ public static boolean putMetricIfAbsent(long id, ClusterParamMetric metric) {
+ AssertUtil.notNull(metric, "metric cannot be null");
+ if (METRIC_MAP.containsKey(id)) {
+ return false;
+ }
+ METRIC_MAP.put(id, metric);
+ return true;
+ }
+
+ public static void removeMetric(long id) {
+ METRIC_MAP.remove(id);
+ }
+
+ public static ClusterParamMetric getMetric(long id) {
+ return METRIC_MAP.get(id);
+ }
+
+ public static void resetFlowMetrics() {
+ Set keySet = METRIC_MAP.keySet();
+ for (Long id : keySet) {
+ METRIC_MAP.put(id, new ClusterParamMetric(ClusterServerConfigManager.getSampleCount(),
+ ClusterServerConfigManager.getIntervalMs()));
+ }
+ }
+
+ private ClusterParamMetricStatistics() {}
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterFlowEvent.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterFlowEvent.java
new file mode 100644
index 00000000..8b79094f
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterFlowEvent.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.data;
+
+/**
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public enum ClusterFlowEvent {
+
+ /**
+ * Normal pass.
+ */
+ PASS,
+ /**
+ * Normal block.
+ */
+ BLOCK,
+ /**
+ * Token request (from client) passed.
+ */
+ PASS_REQUEST,
+ /**
+ * Token request (from client) blocked.
+ */
+ BLOCK_REQUEST,
+ /**
+ * Pass (pre-occupy incoming buckets).
+ */
+ OCCUPIED_PASS,
+ /**
+ * Block (pre-occupy incoming buckets failed).
+ */
+ OCCUPIED_BLOCK,
+ /**
+ * Waiting due to flow shaping or for next bucket tick.
+ */
+ WAITING
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterMetricBucket.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterMetricBucket.java
new file mode 100644
index 00000000..a27dac5a
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterMetricBucket.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.data;
+
+import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
+
+/**
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public class ClusterMetricBucket {
+
+ private final LongAdder[] counters;
+
+ public ClusterMetricBucket() {
+ ClusterFlowEvent[] events = ClusterFlowEvent.values();
+ this.counters = new LongAdder[events.length];
+ for (ClusterFlowEvent event : events) {
+ counters[event.ordinal()] = new LongAdder();
+ }
+ }
+
+ public void reset() {
+ for (ClusterFlowEvent event : ClusterFlowEvent.values()) {
+ counters[event.ordinal()].reset();
+ }
+ }
+
+ public long get(ClusterFlowEvent event) {
+ return counters[event.ordinal()].sum();
+ }
+
+ public ClusterMetricBucket add(ClusterFlowEvent event, long count) {
+ counters[event.ordinal()].add(count);
+ return this;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetric.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetric.java
new file mode 100644
index 00000000..eea3c459
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetric.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.metric;
+
+import java.util.List;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterMetricBucket;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+
+/**
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public class ClusterMetric {
+
+ private final ClusterMetricLeapArray metric;
+
+ public ClusterMetric(int sampleCount, int intervalInMs) {
+ AssertUtil.isTrue(sampleCount > 0, "sampleCount should be positive");
+ AssertUtil.isTrue(intervalInMs > 0, "interval should be positive");
+ AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
+ int windowLengthInMs = intervalInMs / sampleCount;
+ this.metric = new ClusterMetricLeapArray(windowLengthInMs, intervalInMs);
+ }
+
+ public void add(ClusterFlowEvent event, long count) {
+ metric.currentWindow().value().add(event, count);
+ }
+
+ public long getCurrentCount(ClusterFlowEvent event) {
+ return metric.currentWindow().value().get(event);
+ }
+
+ /**
+ * Get total sum for provided event in {@code intervalInSec}.
+ *
+ * @param event event to calculate
+ * @return total sum for event
+ */
+ public long getSum(ClusterFlowEvent event) {
+ metric.currentWindow();
+ long sum = 0;
+
+ List buckets = metric.values();
+ for (ClusterMetricBucket bucket : buckets) {
+ sum += bucket.get(event);
+ }
+ return sum;
+ }
+
+ /**
+ * Get average count for provided event per second.
+ *
+ * @param event event to calculate
+ * @return average count per second for event
+ */
+ public double getAvg(ClusterFlowEvent event) {
+ return getSum(event) / metric.getIntervalInSecond();
+ }
+
+ /**
+ * Try to pre-occupy upcoming buckets.
+ *
+ * @return time to wait for next bucket (in ms); 0 if cannot occupy next buckets
+ */
+ public int tryOccupyNext(ClusterFlowEvent event, int acquireCount, double threshold) {
+ double latestQps = getAvg(ClusterFlowEvent.PASS);
+ if (!canOccupy(event, acquireCount, latestQps, threshold)) {
+ return 0;
+ }
+ metric.addOccupyPass(acquireCount);
+ add(ClusterFlowEvent.WAITING, acquireCount);
+ return 1000 / metric.getSampleCount();
+ }
+
+ private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) {
+ long headPass = metric.getFirstCountOfWindow(event);
+ long occupiedCount = metric.getOccupiedCount(event);
+ // bucket to occupy (= incoming bucket)
+ // ↓
+ // | head bucket | | | | current bucket |
+ // +-------------+----+----+----+----------- ----+
+ // (headPass)
+ return latestQps + (acquireCount + occupiedCount) - headPass <= threshold;
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java
new file mode 100644
index 00000000..40d2d752
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.metric;
+
+import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent;
+import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterMetricBucket;
+import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
+import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
+import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
+
+/**
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public class ClusterMetricLeapArray extends LeapArray {
+
+ private final LongAdder[] occupyCounter;
+ private boolean hasOccupied = false;
+
+ /**
+ * The total bucket count is: {@link #sampleCount} = intervalInMs / windowLengthInMs.
+ *
+ * @param windowLengthInMs a single window bucket's time length in milliseconds.
+ * @param intervalInMs the total time span of this {@link LeapArray} in milliseconds.
+ */
+ public ClusterMetricLeapArray(int windowLengthInMs, int intervalInMs) {
+ super(windowLengthInMs, intervalInMs / 1000);
+ ClusterFlowEvent[] events = ClusterFlowEvent.values();
+ this.occupyCounter = new LongAdder[events.length];
+ for (ClusterFlowEvent event : events) {
+ occupyCounter[event.ordinal()] = new LongAdder();
+ }
+ }
+
+ @Override
+ public ClusterMetricBucket newEmptyBucket() {
+ return new ClusterMetricBucket();
+ }
+
+ @Override
+ protected WindowWrap resetWindowTo(WindowWrap w, long startTime) {
+ w.resetTo(startTime);
+ w.value().reset();
+ transferOccupyToBucket(w.value());
+ return w;
+ }
+
+ private void transferOccupyToBucket(/*@Valid*/ ClusterMetricBucket bucket) {
+ if (hasOccupied) {
+ transferOccupiedCount(bucket, ClusterFlowEvent.PASS, ClusterFlowEvent.OCCUPIED_PASS);
+ transferOccupiedThenReset(bucket, ClusterFlowEvent.PASS);
+ transferOccupiedThenReset(bucket, ClusterFlowEvent.PASS_REQUEST);
+ hasOccupied = false;
+ }
+ }
+
+ private void transferOccupiedCount(ClusterMetricBucket bucket, ClusterFlowEvent source, ClusterFlowEvent target) {
+ bucket.add(target, occupyCounter[source.ordinal()].sum());
+ }
+
+ private void transferOccupiedThenReset(ClusterMetricBucket bucket, ClusterFlowEvent event) {
+ bucket.add(event, occupyCounter[event.ordinal()].sumThenReset());
+ }
+
+ public void addOccupyPass(int count) {
+ occupyCounter[ClusterFlowEvent.PASS.ordinal()].add(count);
+ occupyCounter[ClusterFlowEvent.PASS_REQUEST.ordinal()].add(1);
+ this.hasOccupied = true;
+ }
+
+ public long getOccupiedCount(ClusterFlowEvent event) {
+ return occupyCounter[event.ordinal()].sum();
+ }
+
+ public long getFirstCountOfWindow(ClusterFlowEvent event) {
+ if (event == null) {
+ return 0;
+ }
+ WindowWrap windowWrap = getValidHead();
+ if (windowWrap == null) {
+ return 0;
+ }
+ return windowWrap.value().get(event);
+ }
+}
diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParamMetric.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParamMetric.java
new file mode 100644
index 00000000..4daf7e41
--- /dev/null
+++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParamMetric.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.cluster.flow.statistic.metric;
+
+import java.util.List;
+
+import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
+import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+
+/**
+ * @author Eric Zhao
+ * @since 1.4.0
+ */
+public class ClusterParamMetric {
+
+ public static final int DEFAULT_CLUSTER_MAX_CAPACITY = 4000;
+
+ private final ClusterParameterLeapArray metric;
+
+ public ClusterParamMetric(int sampleCount, int intervalInMs) {
+ this(sampleCount, intervalInMs, DEFAULT_CLUSTER_MAX_CAPACITY);
+ }
+
+ public ClusterParamMetric(int sampleCount, int intervalInMs, int maxCapacity) {
+ AssertUtil.isTrue(sampleCount > 0, "sampleCount should be positive");
+ AssertUtil.isTrue(intervalInMs > 0, "interval should be positive");
+ AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
+ int windowLengthInMs = intervalInMs / sampleCount;
+ this.metric = new ClusterParameterLeapArray<>(windowLengthInMs, intervalInMs, maxCapacity);
+ }
+
+ public long getSum(Object value) {
+ if (value == null) {
+ return 0;
+ }
+
+ metric.currentWindow();
+ long sum = 0;
+
+ List> buckets = metric.values();
+ for (CacheMap