diff --git a/sentinel-extension/pom.xml b/sentinel-extension/pom.xml
index 075323a6..f7ae6b6a 100755
--- a/sentinel-extension/pom.xml
+++ b/sentinel-extension/pom.xml
@@ -19,6 +19,7 @@
sentinel-datasource-redis
sentinel-annotation-aspectj
sentinel-parameter-flow-control
+ sentinel-datasource-consul
diff --git a/sentinel-extension/sentinel-datasource-consul/pom.xml b/sentinel-extension/sentinel-datasource-consul/pom.xml
new file mode 100644
index 00000000..02730215
--- /dev/null
+++ b/sentinel-extension/sentinel-datasource-consul/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+ sentinel-extension
+ com.alibaba.csp
+ 1.7.0-SNAPSHOT
+
+ 4.0.0
+
+ sentinel-datasource-consul
+ jar
+
+
+ 1.8
+ 1.8
+ 1.4.2
+ 2.0.0
+
+
+
+
+ com.alibaba.csp
+ sentinel-datasource-extension
+
+
+ com.ecwid.consul
+ consul-api
+ ${consul.version}
+
+
+ com.pszymczyk.consul
+ embedded-consul
+ ${consul.process.version}
+ test
+
+
+ junit
+ junit
+ test
+
+
+ com.alibaba
+ fastjson
+ test
+
+
+
\ No newline at end of file
diff --git a/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java b/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java
new file mode 100644
index 00000000..132c9538
--- /dev/null
+++ b/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java
@@ -0,0 +1,175 @@
+package com.alibaba.csp.sentinel.datasource.consul;
+
+import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
+import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
+import com.alibaba.csp.sentinel.datasource.Converter;
+import com.alibaba.csp.sentinel.log.RecordLog;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+
+import java.util.concurrent.*;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
+/**
+ *
+ * A read-only {@code DataSource} with Consul backend.
+ *
+ *
+ * The data source first initial rules from a Consul during initialization.
+ * Then it start a watcher to observe the updates of rule date and update to memory.
+ *
+ * Consul do not provide http api to watch the update of KV,so it use a long polling and
+ * blocking queries of the Consul's feature
+ * to watch and update value easily.When Querying data by index will blocking until change or timeout. If
+ * the index of the current query is larger than before, it means that the data has changed.
+ *
+ *
+ * @author wavesZh
+ */
+public class ConsulDataSource extends AbstractDataSource {
+
+ private static final int DEFAULT_PORT = 8500;
+
+ private final ConsulClient client;
+
+ private final String address;
+
+ private String ruleKey;
+ /**
+ * Record the data's index in Consul to watch the change.
+ * If lastIndex is smaller than the index of next query, it means that rule data has updated.
+ */
+ private volatile long lastIndex;
+ /**
+ * Request of query will hang until timeout(ms) or get updated value.
+ */
+ private int watchTimeout;
+
+ private ConsulKVWatcher watcher = new ConsulKVWatcher();
+
+ private ExecutorService watcherService = newSingleThreadExecutor(
+ new NamedThreadFactory("sentinel-consul-ds-update", true));
+
+ public ConsulDataSource(Converter parser, String host, String ruleKey, int watchTimeout) {
+ this(host, DEFAULT_PORT, ruleKey, watchTimeout, parser);
+ }
+
+ /**
+ * Constructor of {@code ConsulDataSource}.
+ * @param parser customized data parser, cannot be empty
+ * @param host consul agent host
+ * @param port consul agent port
+ * @param ruleKey data key in Consul
+ * @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is ms
+ */
+ public ConsulDataSource(String host, int port, String ruleKey, int watchTimeout, Converter parser) {
+ super(parser);
+ AssertUtil.notNull(host, "Consul host can not be null");
+ AssertUtil.notEmpty(ruleKey, "Consul ruleKey can not be empty");
+ this.client = new ConsulClient(host, port);
+ this.address = host + ":" + port;
+ this.ruleKey = ruleKey;
+ this.watchTimeout = watchTimeout;
+ loadInitialConfig();
+ startKVWatcher();
+ }
+
+ private void startKVWatcher() {
+ watcherService.submit(watcher);
+ }
+
+ private void loadInitialConfig() {
+ try {
+ T newValue = loadConfig();
+ if (newValue == null) {
+ RecordLog.warn("[ConsulDataSource] WARN: initial config is null, you may have to check your data source");
+ }
+ getProperty().updateValue(newValue);
+ } catch (Exception ex) {
+ RecordLog.warn("[ConsulDataSource] Error when loading initial config", ex);
+ }
+ }
+
+ @Override
+ public String readSource() throws Exception {
+ if (this.client == null) {
+ throw new IllegalStateException("Consul has not been initialized or error occurred");
+ }
+ Response response = getValueImmediately(ruleKey);
+ if (response != null) {
+ GetValue value = response.getValue();
+ lastIndex = response.getConsulIndex();
+ return value != null ? value.getDecodedValue() : null;
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ watcher.stop();
+ watcherService.shutdown();
+ }
+
+ private class ConsulKVWatcher implements Runnable {
+ private boolean running = true;
+ @Override
+ public void run() {
+ while (running) {
+ // It will be blocked until watchTimeout(ms) if rule data has no update.
+ Response response = getValue(ruleKey, lastIndex, watchTimeout / 1000);
+ if (response == null) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(watchTimeout);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+ GetValue getValue = response.getValue();
+ Long currentIndex = response.getConsulIndex();
+ if (currentIndex == null || currentIndex <= lastIndex) {
+ continue;
+ }
+ lastIndex = currentIndex;
+ if (getValue != null) {
+ String newValue = getValue.getDecodedValue();
+ getProperty().updateValue(parser.convert(newValue));
+ RecordLog.info(String.format("[ConsulDataSource] New property value received for (%s, %s): %s",
+ address, ruleKey, newValue));
+ }
+ }
+ }
+
+ private void stop() {
+ running = false;
+ }
+ }
+
+ /**
+ * get data from Consul immediately.
+ *
+ * @param key data key in Consul
+ */
+ public Response getValueImmediately(String key) {
+ return getValue(key, -1, -1);
+ }
+ /**
+ * get data from Consul.
+ *
+ * @param key data key in Consul
+ * @param index the index of data in Consul.
+ * @param waitTime time(second) for waiting get updated value.
+ */
+ private Response getValue(String key, long index, long waitTime) {
+ try {
+ return client.getKVValue(key, new QueryParams(waitTime, index));
+ } catch (Throwable t) {
+ RecordLog.warn("fail to get value for key: " + key);
+ }
+ return null;
+ }
+
+}
diff --git a/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java b/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java
new file mode 100644
index 00000000..8d2c96ff
--- /dev/null
+++ b/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java
@@ -0,0 +1,98 @@
+package com.alibaba.csp.sentinel.datasource.consul;
+
+
+import com.alibaba.csp.sentinel.datasource.Converter;
+import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.Response;
+import com.pszymczyk.consul.ConsulProcess;
+import com.pszymczyk.consul.ConsulStarterBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+
+public class ConsulDataSourceTest {
+ private ConsulProcess consul;
+
+ private ConsulClient client;
+
+ private ReadableDataSource> consulDataSource;
+
+ private String host = "127.0.0.1";
+
+ private int port;
+
+ private String ruleKey = "sentinel.rules.flow.ruleKey";
+
+ private int waitTimeout = 60;
+
+ private List rules;
+
+ @Before
+ public void init() {
+ this.consul = ConsulStarterBuilder.consulStarter()
+ .build()
+ .start();
+ this.port = consul.getHttpPort();
+ client = new ConsulClient(host, port);
+ Converter> flowConfigParser = buildFlowConfigParser();
+ String flowRulesJson =
+ "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, "
+ + "\"refResource\":null, "
+ +
+ "\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]";
+ initConsulRuleData(flowRulesJson);
+ rules = flowConfigParser.convert(flowRulesJson);
+ consulDataSource = new ConsulDataSource<>(host, port, ruleKey, waitTimeout, flowConfigParser);
+ FlowRuleManager.register2Property(consulDataSource.getProperty());
+ }
+
+ @After
+ public void clean() throws Exception {
+ if (consulDataSource != null) {
+ consulDataSource.close();
+ }
+ if (consul != null) {
+ consul.close();
+ }
+ }
+
+
+ @Test
+ public void testConsulDataSourceWhenInit() {
+ List rules = FlowRuleManager.getRules();
+ Assert.assertEquals(this.rules, rules);
+ }
+
+
+ @Test
+ public void testConsulDataSourceWhenUpdate() throws InterruptedException {
+ rules.get(0).setMaxQueueingTimeMs(new Random().nextInt());
+ client.setKVValue(ruleKey, JSON.toJSONString(rules));
+ TimeUnit.MILLISECONDS.sleep(waitTimeout);
+ List rules = FlowRuleManager.getRules();
+ Assert.assertEquals(this.rules, rules);
+ }
+
+
+ private Converter> buildFlowConfigParser() {
+ return source -> JSON.parseObject(source, new TypeReference>() {});
+ }
+
+ private void initConsulRuleData(String flowRulesJson) {
+ Response response = client.setKVValue(ruleKey, flowRulesJson);
+ Assert.assertEquals(Boolean.TRUE, response.getValue());
+ }
+
+
+}
\ No newline at end of file