> dataSource = new ConsulDataSource<>(host, port, ruleKey, waitTimeoutInSecond, flowConfigParser);
+FlowRuleManager.register2Property(dataSource.getProperty());
+```
+
+- `ruleKey`: the rule persistence key
+- `waitTimeoutInSecond`: long polling timeout (in second) of the Consul API client
diff --git a/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java b/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java
index 132c9538..6dc4bf6e 100644
--- a/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java
+++ b/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.alibaba.csp.sentinel.datasource.consul;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
@@ -5,6 +20,7 @@ import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AssertUtil;
+
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
@@ -12,8 +28,6 @@ import com.ecwid.consul.v1.kv.model.GetValue;
import java.util.concurrent.*;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-
/**
*
* A read-only {@code DataSource} with Consul backend.
@@ -34,42 +48,45 @@ public class ConsulDataSource extends AbstractDataSource {
private static final int DEFAULT_PORT = 8500;
- private final ConsulClient client;
-
private final String address;
+ private final String ruleKey;
+ /**
+ * Request of query will hang until timeout (in second) or get updated value.
+ */
+ private final int watchTimeout;
- private String ruleKey;
/**
* Record the data's index in Consul to watch the change.
* If lastIndex is smaller than the index of next query, it means that rule data has updated.
*/
private volatile long lastIndex;
- /**
- * Request of query will hang until timeout(ms) or get updated value.
- */
- private int watchTimeout;
- private ConsulKVWatcher watcher = new ConsulKVWatcher();
+ private final ConsulClient client;
- private ExecutorService watcherService = newSingleThreadExecutor(
- new NamedThreadFactory("sentinel-consul-ds-update", true));
+ private final ConsulKVWatcher watcher = new ConsulKVWatcher();
- public ConsulDataSource(Converter parser, String host, String ruleKey, int watchTimeout) {
- this(host, DEFAULT_PORT, ruleKey, watchTimeout, parser);
+ @SuppressWarnings("PMD.ThreadPoolCreationRule")
+ private final ExecutorService watcherService = Executors.newSingleThreadExecutor(
+ new NamedThreadFactory("sentinel-consul-ds-watcher", true));
+
+ public ConsulDataSource(String host, String ruleKey, int watchTimeoutInSecond, Converter parser) {
+ this(host, DEFAULT_PORT, ruleKey, watchTimeoutInSecond, parser);
}
/**
* Constructor of {@code ConsulDataSource}.
- * @param parser customized data parser, cannot be empty
- * @param host consul agent host
- * @param port consul agent port
- * @param ruleKey data key in Consul
- * @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is ms
+ *
+ * @param parser customized data parser, cannot be empty
+ * @param host consul agent host
+ * @param port consul agent port
+ * @param ruleKey data key in Consul
+ * @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is second (s)
*/
public ConsulDataSource(String host, int port, String ruleKey, int watchTimeout, Converter parser) {
super(parser);
AssertUtil.notNull(host, "Consul host can not be null");
AssertUtil.notEmpty(ruleKey, "Consul ruleKey can not be empty");
+ AssertUtil.isTrue(watchTimeout >= 0, "watchTimeout should not be negative");
this.client = new ConsulClient(host, port);
this.address = host + ":" + port;
this.ruleKey = ruleKey;
@@ -86,7 +103,8 @@ public class ConsulDataSource extends AbstractDataSource {
try {
T newValue = loadConfig();
if (newValue == null) {
- RecordLog.warn("[ConsulDataSource] WARN: initial config is null, you may have to check your data source");
+ RecordLog.warn(
+ "[ConsulDataSource] WARN: initial config is null, you may have to check your data source");
}
getProperty().updateValue(newValue);
} catch (Exception ex) {
@@ -116,14 +134,15 @@ public class ConsulDataSource extends AbstractDataSource {
private class ConsulKVWatcher implements Runnable {
private boolean running = true;
+
@Override
public void run() {
while (running) {
- // It will be blocked until watchTimeout(ms) if rule data has no update.
- Response response = getValue(ruleKey, lastIndex, watchTimeout / 1000);
+ // It will be blocked until watchTimeout(s) if rule data has no update.
+ Response response = getValue(ruleKey, lastIndex, watchTimeout);
if (response == null) {
try {
- TimeUnit.MILLISECONDS.sleep(watchTimeout);
+ TimeUnit.MILLISECONDS.sleep(watchTimeout * 1000);
} catch (InterruptedException e) {
}
continue;
@@ -136,9 +155,15 @@ public class ConsulDataSource extends AbstractDataSource {
lastIndex = currentIndex;
if (getValue != null) {
String newValue = getValue.getDecodedValue();
- getProperty().updateValue(parser.convert(newValue));
- RecordLog.info(String.format("[ConsulDataSource] New property value received for (%s, %s): %s",
- address, ruleKey, newValue));
+ try {
+ getProperty().updateValue(parser.convert(newValue));
+ RecordLog.info("[ConsulDataSource] New property value received for ({0}, {1}): {2}",
+ address, ruleKey, newValue);
+ } catch (Exception ex) {
+ // In case of parsing error.
+ RecordLog.warn("[ConsulDataSource] Failed to update value for ({0}, {1}), raw value: {2}",
+ address, ruleKey, newValue);
+ }
}
}
}
@@ -149,25 +174,28 @@ public class ConsulDataSource extends AbstractDataSource {
}
/**
- * get data from Consul immediately.
+ * Get data from Consul immediately.
*
* @param key data key in Consul
+ * @return the value associated to the key, or null if error occurs
*/
- public Response getValueImmediately(String key) {
+ private Response getValueImmediately(String key) {
return getValue(key, -1, -1);
}
+
/**
- * get data from Consul.
+ * Get data from Consul (blocking).
*
- * @param key data key in Consul
- * @param index the index of data in Consul.
+ * @param key data key in Consul
+ * @param index the index of data in Consul.
* @param waitTime time(second) for waiting get updated value.
+ * @return the value associated to the key, or null if error occurs
*/
private Response getValue(String key, long index, long waitTime) {
try {
return client.getKVValue(key, new QueryParams(waitTime, index));
} catch (Throwable t) {
- RecordLog.warn("fail to get value for key: " + key);
+ RecordLog.warn("[ConsulDataSource] Failed to get value for key: " + key, t);
}
return null;
}
diff --git a/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java b/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java
index 8d2c96ff..f5b531c6 100644
--- a/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java
+++ b/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java
@@ -1,12 +1,27 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.alibaba.csp.sentinel.datasource.consul;
-
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
+
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.Response;
import com.pszymczyk.consul.ConsulProcess;
@@ -16,44 +31,43 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-
+/**
+ * @author wavesZh
+ */
public class ConsulDataSourceTest {
- private ConsulProcess consul;
+ private final String ruleKey = "sentinel.rules.flow.ruleKey";
+ private final int waitTimeoutInSecond = 1;
+
+ private ConsulProcess consul;
private ConsulClient client;
private ReadableDataSource> consulDataSource;
- private String host = "127.0.0.1";
-
- private int port;
-
- private String ruleKey = "sentinel.rules.flow.ruleKey";
-
- private int waitTimeout = 60;
-
private List rules;
@Before
public void init() {
this.consul = ConsulStarterBuilder.consulStarter()
- .build()
- .start();
- this.port = consul.getHttpPort();
+ .build()
+ .start();
+ int port = consul.getHttpPort();
+ String host = "127.0.0.1";
client = new ConsulClient(host, port);
Converter> flowConfigParser = buildFlowConfigParser();
String flowRulesJson =
- "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, "
- + "\"refResource\":null, "
- +
- "\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]";
+ "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, "
+ + "\"refResource\":null, "
+ +
+ "\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]";
initConsulRuleData(flowRulesJson);
rules = flowConfigParser.convert(flowRulesJson);
- consulDataSource = new ConsulDataSource<>(host, port, ruleKey, waitTimeout, flowConfigParser);
+ consulDataSource = new ConsulDataSource<>(host, port, ruleKey, waitTimeoutInSecond, flowConfigParser);
FlowRuleManager.register2Property(consulDataSource.getProperty());
}
@@ -65,34 +79,31 @@ public class ConsulDataSourceTest {
if (consul != null) {
consul.close();
}
+ FlowRuleManager.loadRules(new ArrayList<>());
}
-
@Test
public void testConsulDataSourceWhenInit() {
List rules = FlowRuleManager.getRules();
Assert.assertEquals(this.rules, rules);
}
-
@Test
public void testConsulDataSourceWhenUpdate() throws InterruptedException {
rules.get(0).setMaxQueueingTimeMs(new Random().nextInt());
client.setKVValue(ruleKey, JSON.toJSONString(rules));
- TimeUnit.MILLISECONDS.sleep(waitTimeout);
+ TimeUnit.SECONDS.sleep(waitTimeoutInSecond);
List rules = FlowRuleManager.getRules();
Assert.assertEquals(this.rules, rules);
}
-
private Converter> buildFlowConfigParser() {
return source -> JSON.parseObject(source, new TypeReference>() {});
}
private void initConsulRuleData(String flowRulesJson) {
Response response = client.setKVValue(ruleKey, flowRulesJson);
- Assert.assertEquals(Boolean.TRUE, response.getValue());
+ Assert.assertEquals(Boolean.TRUE, response.getValue());
}
-
-}
\ No newline at end of file
+}