Add ACL token support for Consul data-source (#2307)
Co-authored-by: Zhiguo.Chen <chenzhiguo@jd.com>
This commit is contained in:
parent
38e0306f95
commit
436a88562e
|
|
@ -21,6 +21,7 @@ import com.alibaba.csp.sentinel.datasource.Converter;
|
||||||
import com.alibaba.csp.sentinel.log.RecordLog;
|
import com.alibaba.csp.sentinel.log.RecordLog;
|
||||||
import com.alibaba.csp.sentinel.util.AssertUtil;
|
import com.alibaba.csp.sentinel.util.AssertUtil;
|
||||||
|
|
||||||
|
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||||
import com.ecwid.consul.v1.ConsulClient;
|
import com.ecwid.consul.v1.ConsulClient;
|
||||||
import com.ecwid.consul.v1.QueryParams;
|
import com.ecwid.consul.v1.QueryParams;
|
||||||
import com.ecwid.consul.v1.Response;
|
import com.ecwid.consul.v1.Response;
|
||||||
|
|
@ -43,12 +44,14 @@ import java.util.concurrent.*;
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* @author wavesZh
|
* @author wavesZh
|
||||||
|
* @author Zhiguo.Chen
|
||||||
*/
|
*/
|
||||||
public class ConsulDataSource<T> extends AbstractDataSource<String, T> {
|
public class ConsulDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
|
|
||||||
private static final int DEFAULT_PORT = 8500;
|
private static final int DEFAULT_PORT = 8500;
|
||||||
|
|
||||||
private final String address;
|
private final String address;
|
||||||
|
private final String token;
|
||||||
private final String ruleKey;
|
private final String ruleKey;
|
||||||
/**
|
/**
|
||||||
* Request of query will hang until timeout (in second) or get updated value.
|
* Request of query will hang until timeout (in second) or get updated value.
|
||||||
|
|
@ -83,12 +86,27 @@ public class ConsulDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
* @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is second (s)
|
* @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is second (s)
|
||||||
*/
|
*/
|
||||||
public ConsulDataSource(String host, int port, String ruleKey, int watchTimeout, Converter<String, T> parser) {
|
public ConsulDataSource(String host, int port, String ruleKey, int watchTimeout, Converter<String, T> parser) {
|
||||||
|
this(host, port, null, ruleKey, watchTimeout, parser);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor of {@code ConsulDataSource}.
|
||||||
|
*
|
||||||
|
* @param parser customized data parser, cannot be empty
|
||||||
|
* @param host consul agent host
|
||||||
|
* @param port consul agent port
|
||||||
|
* @param token consul agent acl token
|
||||||
|
* @param ruleKey data key in Consul
|
||||||
|
* @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is second (s)
|
||||||
|
*/
|
||||||
|
public ConsulDataSource(String host, int port, String token, String ruleKey, int watchTimeout, Converter<String, T> parser) {
|
||||||
super(parser);
|
super(parser);
|
||||||
AssertUtil.notNull(host, "Consul host can not be null");
|
AssertUtil.notNull(host, "Consul host can not be null");
|
||||||
AssertUtil.notEmpty(ruleKey, "Consul ruleKey can not be empty");
|
AssertUtil.notEmpty(ruleKey, "Consul ruleKey can not be empty");
|
||||||
AssertUtil.isTrue(watchTimeout >= 0, "watchTimeout should not be negative");
|
AssertUtil.isTrue(watchTimeout >= 0, "watchTimeout should not be negative");
|
||||||
this.client = new ConsulClient(host, port);
|
this.client = new ConsulClient(host, port);
|
||||||
this.address = host + ":" + port;
|
this.address = host + ":" + port;
|
||||||
|
this.token = token;
|
||||||
this.ruleKey = ruleKey;
|
this.ruleKey = ruleKey;
|
||||||
this.watchTimeout = watchTimeout;
|
this.watchTimeout = watchTimeout;
|
||||||
loadInitialConfig();
|
loadInitialConfig();
|
||||||
|
|
@ -193,7 +211,11 @@ public class ConsulDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
*/
|
*/
|
||||||
private Response<GetValue> getValue(String key, long index, long waitTime) {
|
private Response<GetValue> getValue(String key, long index, long waitTime) {
|
||||||
try {
|
try {
|
||||||
return client.getKVValue(key, new QueryParams(waitTime, index));
|
if (StringUtil.isNotBlank(token)) {
|
||||||
|
return client.getKVValue(key, token, new QueryParams(waitTime, index));
|
||||||
|
} else {
|
||||||
|
return client.getKVValue(key, new QueryParams(waitTime, index));
|
||||||
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
RecordLog.warn("[ConsulDataSource] Failed to get value for key: " + key, t);
|
RecordLog.warn("[ConsulDataSource] Failed to get value for key: " + key, t);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue