Improve ConsulDataSource and add README.md for the module

- Change the unit of waitTimeout from ms to s (the previous implementation lay suffer the bug that ms/1000=0 when ms<=1000)
- Improve the error handling when parsing the value
- Add README.md

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
This commit is contained in:
Eric Zhao 2019-08-14 20:34:34 +08:00
parent b2ff4b719b
commit 2682bd1aae
3 changed files with 127 additions and 58 deletions

View File

@ -0,0 +1,30 @@
# Sentinel DataSource Consul
Sentinel DataSource Consul provides integration with Consul. The data source leverages blocking query (backed by
long polling) of Consul.
> **NOTE**: This module requires JDK 1.8 or later.
## Usage
To use Sentinel DataSource Consul, you could add the following dependency:
```xml
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-consul</artifactId>
<version>x.y.z</version>
</dependency>
```
Then you can create a `ConsulDataSource` and register to rule managers.
For instance:
```java
ReadableDataSource<String, List<FlowRule>> dataSource = new ConsulDataSource<>(host, port, ruleKey, waitTimeoutInSecond, flowConfigParser);
FlowRuleManager.register2Property(dataSource.getProperty());
```
- `ruleKey`: the rule persistence key
- `waitTimeoutInSecond`: long polling timeout (in second) of the Consul API client

View File

@ -1,3 +1,18 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.datasource.consul; package com.alibaba.csp.sentinel.datasource.consul;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
@ -5,6 +20,7 @@ import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.AssertUtil;
import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams; import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response; import com.ecwid.consul.v1.Response;
@ -12,8 +28,6 @@ import com.ecwid.consul.v1.kv.model.GetValue;
import java.util.concurrent.*; import java.util.concurrent.*;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
/** /**
* <p> * <p>
* A read-only {@code DataSource} with Consul backend. * A read-only {@code DataSource} with Consul backend.
@ -34,42 +48,45 @@ public class ConsulDataSource<T> extends AbstractDataSource<String, T> {
private static final int DEFAULT_PORT = 8500; private static final int DEFAULT_PORT = 8500;
private final ConsulClient client;
private final String address; private final String address;
private final String ruleKey;
/**
* Request of query will hang until timeout (in second) or get updated value.
*/
private final int watchTimeout;
private String ruleKey;
/** /**
* Record the data's index in Consul to watch the change. * Record the data's index in Consul to watch the change.
* If lastIndex is smaller than the index of next query, it means that rule data has updated. * If lastIndex is smaller than the index of next query, it means that rule data has updated.
*/ */
private volatile long lastIndex; private volatile long lastIndex;
/**
* Request of query will hang until timeout(ms) or get updated value.
*/
private int watchTimeout;
private ConsulKVWatcher watcher = new ConsulKVWatcher(); private final ConsulClient client;
private ExecutorService watcherService = newSingleThreadExecutor( private final ConsulKVWatcher watcher = new ConsulKVWatcher();
new NamedThreadFactory("sentinel-consul-ds-update", true));
public ConsulDataSource(Converter<String, T> parser, String host, String ruleKey, int watchTimeout) { @SuppressWarnings("PMD.ThreadPoolCreationRule")
this(host, DEFAULT_PORT, ruleKey, watchTimeout, parser); private final ExecutorService watcherService = Executors.newSingleThreadExecutor(
new NamedThreadFactory("sentinel-consul-ds-watcher", true));
public ConsulDataSource(String host, String ruleKey, int watchTimeoutInSecond, Converter<String, T> parser) {
this(host, DEFAULT_PORT, ruleKey, watchTimeoutInSecond, parser);
} }
/** /**
* Constructor of {@code ConsulDataSource}. * Constructor of {@code ConsulDataSource}.
*
* @param parser customized data parser, cannot be empty * @param parser customized data parser, cannot be empty
* @param host consul agent host * @param host consul agent host
* @param port consul agent port * @param port consul agent port
* @param ruleKey data key in Consul * @param ruleKey data key in Consul
* @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is ms * @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is second (s)
*/ */
public ConsulDataSource(String host, int port, String ruleKey, int watchTimeout, Converter<String, T> parser) { public ConsulDataSource(String host, int port, String ruleKey, int watchTimeout, Converter<String, T> parser) {
super(parser); super(parser);
AssertUtil.notNull(host, "Consul host can not be null"); AssertUtil.notNull(host, "Consul host can not be null");
AssertUtil.notEmpty(ruleKey, "Consul ruleKey can not be empty"); AssertUtil.notEmpty(ruleKey, "Consul ruleKey can not be empty");
AssertUtil.isTrue(watchTimeout >= 0, "watchTimeout should not be negative");
this.client = new ConsulClient(host, port); this.client = new ConsulClient(host, port);
this.address = host + ":" + port; this.address = host + ":" + port;
this.ruleKey = ruleKey; this.ruleKey = ruleKey;
@ -86,7 +103,8 @@ public class ConsulDataSource<T> extends AbstractDataSource<String, T> {
try { try {
T newValue = loadConfig(); T newValue = loadConfig();
if (newValue == null) { if (newValue == null) {
RecordLog.warn("[ConsulDataSource] WARN: initial config is null, you may have to check your data source"); RecordLog.warn(
"[ConsulDataSource] WARN: initial config is null, you may have to check your data source");
} }
getProperty().updateValue(newValue); getProperty().updateValue(newValue);
} catch (Exception ex) { } catch (Exception ex) {
@ -116,14 +134,15 @@ public class ConsulDataSource<T> extends AbstractDataSource<String, T> {
private class ConsulKVWatcher implements Runnable { private class ConsulKVWatcher implements Runnable {
private boolean running = true; private boolean running = true;
@Override @Override
public void run() { public void run() {
while (running) { while (running) {
// It will be blocked until watchTimeout(ms) if rule data has no update. // It will be blocked until watchTimeout(s) if rule data has no update.
Response<GetValue> response = getValue(ruleKey, lastIndex, watchTimeout / 1000); Response<GetValue> response = getValue(ruleKey, lastIndex, watchTimeout);
if (response == null) { if (response == null) {
try { try {
TimeUnit.MILLISECONDS.sleep(watchTimeout); TimeUnit.MILLISECONDS.sleep(watchTimeout * 1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
continue; continue;
@ -136,9 +155,15 @@ public class ConsulDataSource<T> extends AbstractDataSource<String, T> {
lastIndex = currentIndex; lastIndex = currentIndex;
if (getValue != null) { if (getValue != null) {
String newValue = getValue.getDecodedValue(); String newValue = getValue.getDecodedValue();
try {
getProperty().updateValue(parser.convert(newValue)); getProperty().updateValue(parser.convert(newValue));
RecordLog.info(String.format("[ConsulDataSource] New property value received for (%s, %s): %s", RecordLog.info("[ConsulDataSource] New property value received for ({0}, {1}): {2}",
address, ruleKey, newValue)); address, ruleKey, newValue);
} catch (Exception ex) {
// In case of parsing error.
RecordLog.warn("[ConsulDataSource] Failed to update value for ({0}, {1}), raw value: {2}",
address, ruleKey, newValue);
}
} }
} }
} }
@ -149,25 +174,28 @@ public class ConsulDataSource<T> extends AbstractDataSource<String, T> {
} }
/** /**
* get data from Consul immediately. * Get data from Consul immediately.
* *
* @param key data key in Consul * @param key data key in Consul
* @return the value associated to the key, or null if error occurs
*/ */
public Response<GetValue> getValueImmediately(String key) { private Response<GetValue> getValueImmediately(String key) {
return getValue(key, -1, -1); return getValue(key, -1, -1);
} }
/** /**
* get data from Consul. * Get data from Consul (blocking).
* *
* @param key data key in Consul * @param key data key in Consul
* @param index the index of data in Consul. * @param index the index of data in Consul.
* @param waitTime time(second) for waiting get updated value. * @param waitTime time(second) for waiting get updated value.
* @return the value associated to the key, or null if error occurs
*/ */
private Response<GetValue> getValue(String key, long index, long waitTime) { private Response<GetValue> getValue(String key, long index, long waitTime) {
try { try {
return client.getKVValue(key, new QueryParams(waitTime, index)); return client.getKVValue(key, new QueryParams(waitTime, index));
} catch (Throwable t) { } catch (Throwable t) {
RecordLog.warn("fail to get value for key: " + key); RecordLog.warn("[ConsulDataSource] Failed to get value for key: " + key, t);
} }
return null; return null;
} }

View File

@ -1,12 +1,27 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.datasource.consul; package com.alibaba.csp.sentinel.datasource.consul;
import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource; import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.Response; import com.ecwid.consul.v1.Response;
import com.pszymczyk.consul.ConsulProcess; import com.pszymczyk.consul.ConsulProcess;
@ -16,26 +31,24 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* @author wavesZh
*/
public class ConsulDataSourceTest { public class ConsulDataSourceTest {
private ConsulProcess consul;
private final String ruleKey = "sentinel.rules.flow.ruleKey";
private final int waitTimeoutInSecond = 1;
private ConsulProcess consul;
private ConsulClient client; private ConsulClient client;
private ReadableDataSource<String, List<FlowRule>> consulDataSource; private ReadableDataSource<String, List<FlowRule>> consulDataSource;
private String host = "127.0.0.1";
private int port;
private String ruleKey = "sentinel.rules.flow.ruleKey";
private int waitTimeout = 60;
private List<FlowRule> rules; private List<FlowRule> rules;
@Before @Before
@ -43,7 +56,8 @@ public class ConsulDataSourceTest {
this.consul = ConsulStarterBuilder.consulStarter() this.consul = ConsulStarterBuilder.consulStarter()
.build() .build()
.start(); .start();
this.port = consul.getHttpPort(); int port = consul.getHttpPort();
String host = "127.0.0.1";
client = new ConsulClient(host, port); client = new ConsulClient(host, port);
Converter<String, List<FlowRule>> flowConfigParser = buildFlowConfigParser(); Converter<String, List<FlowRule>> flowConfigParser = buildFlowConfigParser();
String flowRulesJson = String flowRulesJson =
@ -53,7 +67,7 @@ public class ConsulDataSourceTest {
"\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]"; "\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]";
initConsulRuleData(flowRulesJson); initConsulRuleData(flowRulesJson);
rules = flowConfigParser.convert(flowRulesJson); rules = flowConfigParser.convert(flowRulesJson);
consulDataSource = new ConsulDataSource<>(host, port, ruleKey, waitTimeout, flowConfigParser); consulDataSource = new ConsulDataSource<>(host, port, ruleKey, waitTimeoutInSecond, flowConfigParser);
FlowRuleManager.register2Property(consulDataSource.getProperty()); FlowRuleManager.register2Property(consulDataSource.getProperty());
} }
@ -65,26 +79,24 @@ public class ConsulDataSourceTest {
if (consul != null) { if (consul != null) {
consul.close(); consul.close();
} }
FlowRuleManager.loadRules(new ArrayList<>());
} }
@Test @Test
public void testConsulDataSourceWhenInit() { public void testConsulDataSourceWhenInit() {
List<FlowRule> rules = FlowRuleManager.getRules(); List<FlowRule> rules = FlowRuleManager.getRules();
Assert.assertEquals(this.rules, rules); Assert.assertEquals(this.rules, rules);
} }
@Test @Test
public void testConsulDataSourceWhenUpdate() throws InterruptedException { public void testConsulDataSourceWhenUpdate() throws InterruptedException {
rules.get(0).setMaxQueueingTimeMs(new Random().nextInt()); rules.get(0).setMaxQueueingTimeMs(new Random().nextInt());
client.setKVValue(ruleKey, JSON.toJSONString(rules)); client.setKVValue(ruleKey, JSON.toJSONString(rules));
TimeUnit.MILLISECONDS.sleep(waitTimeout); TimeUnit.SECONDS.sleep(waitTimeoutInSecond);
List<FlowRule> rules = FlowRuleManager.getRules(); List<FlowRule> rules = FlowRuleManager.getRules();
Assert.assertEquals(this.rules, rules); Assert.assertEquals(this.rules, rules);
} }
private Converter<String, List<FlowRule>> buildFlowConfigParser() { private Converter<String, List<FlowRule>> buildFlowConfigParser() {
return source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}); return source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {});
} }
@ -94,5 +106,4 @@ public class ConsulDataSourceTest {
Assert.assertEquals(Boolean.TRUE, response.getValue()); Assert.assertEquals(Boolean.TRUE, response.getValue());
} }
} }