Add AuthInfo parameter in the constructor of ZooKeeperDataSource to support ACL (#508)
This commit is contained in:
parent
5d2170282f
commit
ec0883d22e
|
|
@ -1,5 +1,6 @@
|
||||||
package com.alibaba.csp.sentinel.datasource.zookeeper;
|
package com.alibaba.csp.sentinel.datasource.zookeeper;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
@ -11,6 +12,7 @@ import com.alibaba.csp.sentinel.datasource.Converter;
|
||||||
import com.alibaba.csp.sentinel.log.RecordLog;
|
import com.alibaba.csp.sentinel.log.RecordLog;
|
||||||
import com.alibaba.csp.sentinel.util.StringUtil;
|
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||||
|
|
||||||
|
import org.apache.curator.framework.AuthInfo;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData;
|
import org.apache.curator.framework.recipes.cache.ChildData;
|
||||||
|
|
@ -47,7 +49,7 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
}
|
}
|
||||||
this.path = path;
|
this.path = path;
|
||||||
|
|
||||||
init(serverAddr);
|
init(serverAddr, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -61,11 +63,25 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
}
|
}
|
||||||
this.path = getPath(groupId, dataId);
|
this.path = getPath(groupId, dataId);
|
||||||
|
|
||||||
init(serverAddr);
|
init(serverAddr, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void init(final String serverAddr) {
|
/**
|
||||||
initZookeeperListener(serverAddr);
|
* This constructor adds authentication information.
|
||||||
|
*/
|
||||||
|
public ZookeeperDataSource(final String serverAddr, final List<AuthInfo> authInfos, final String groupId, final String dataId,
|
||||||
|
Converter<String, T> parser) {
|
||||||
|
super(parser);
|
||||||
|
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
|
||||||
|
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], authInfos=[%s], groupId=[%s], dataId=[%s]", serverAddr, authInfos, groupId, dataId));
|
||||||
|
}
|
||||||
|
this.path = getPath(groupId, dataId);
|
||||||
|
|
||||||
|
init(serverAddr, authInfos);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void init(final String serverAddr, final List<AuthInfo> authInfos) {
|
||||||
|
initZookeeperListener(serverAddr, authInfos);
|
||||||
loadInitialConfig();
|
loadInitialConfig();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -81,7 +97,7 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initZookeeperListener(final String serverAddr) {
|
private void initZookeeperListener(final String serverAddr, final List<AuthInfo> authInfos) {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
this.listener = new NodeCacheListener() {
|
this.listener = new NodeCacheListener() {
|
||||||
|
|
@ -101,7 +117,15 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if (authInfos == null || authInfos.size() == 0) {
|
||||||
this.zkClient = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES));
|
this.zkClient = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES));
|
||||||
|
} else {
|
||||||
|
this.zkClient = CuratorFrameworkFactory.builder().
|
||||||
|
connectString(serverAddr).
|
||||||
|
retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)).
|
||||||
|
authorization(authInfos).
|
||||||
|
build();
|
||||||
|
}
|
||||||
this.zkClient.start();
|
this.zkClient.start();
|
||||||
Stat stat = this.zkClient.checkExists().forPath(this.path);
|
Stat stat = this.zkClient.checkExists().forPath(this.path);
|
||||||
if (stat == null) {
|
if (stat == null) {
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,5 @@
|
||||||
package com.alibaba.csp.sentinel.datasource.zookeeper;
|
package com.alibaba.csp.sentinel.datasource.zookeeper;
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import com.alibaba.csp.sentinel.datasource.Converter;
|
import com.alibaba.csp.sentinel.datasource.Converter;
|
||||||
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
|
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
|
||||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
|
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
|
||||||
|
|
@ -12,17 +7,27 @@ import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
|
||||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
|
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.alibaba.fastjson.TypeReference;
|
import com.alibaba.fastjson.TypeReference;
|
||||||
|
import org.apache.curator.framework.AuthInfo;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||||
import org.apache.curator.test.TestingServer;
|
import org.apache.curator.test.TestingServer;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.ZooDefs;
|
||||||
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
import org.apache.zookeeper.data.Id;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Eric Zhao
|
* @author Eric Zhao
|
||||||
|
|
@ -62,6 +67,53 @@ public class ZookeeperDataSourceTest {
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testZooKeeperDataSourceAuthorization() throws Exception {
|
||||||
|
TestingServer server = new TestingServer(21812);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final String remoteAddress = server.getConnectString();
|
||||||
|
final String groupId = "sentinel-zk-ds-demo";
|
||||||
|
final String dataId = "flow-HK";
|
||||||
|
final String path = "/" + groupId + "/" + dataId;
|
||||||
|
final String scheme = "digest";
|
||||||
|
final String auth = "root:123456";
|
||||||
|
|
||||||
|
AuthInfo authInfo = new AuthInfo(scheme, auth.getBytes());
|
||||||
|
List<AuthInfo> authInfoList = Collections.singletonList(authInfo);
|
||||||
|
|
||||||
|
CuratorFramework zkClient = CuratorFrameworkFactory.builder().
|
||||||
|
connectString(remoteAddress).
|
||||||
|
retryPolicy(new ExponentialBackoffRetry(3, 100)).
|
||||||
|
authorization(authInfoList).
|
||||||
|
build();
|
||||||
|
zkClient.start();
|
||||||
|
Stat stat = zkClient.checkExists().forPath(path);
|
||||||
|
if (stat == null) {
|
||||||
|
ACL acl = new ACL(ZooDefs.Perms.ALL, new Id(scheme, DigestAuthenticationProvider.generateDigest(auth)));
|
||||||
|
zkClient.create().creatingParentContainersIfNeeded().withACL(Collections.singletonList(acl)).forPath(path, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<List<FlowRule>>(remoteAddress,
|
||||||
|
authInfoList, groupId, dataId,
|
||||||
|
new Converter<String, List<FlowRule>>() {
|
||||||
|
@Override
|
||||||
|
public List<FlowRule> convert(String source) {
|
||||||
|
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
|
||||||
|
|
||||||
|
|
||||||
|
final String resourceName = "HK";
|
||||||
|
publishThenTestFor(zkClient, path, resourceName, 10);
|
||||||
|
publishThenTestFor(zkClient, path, resourceName, 15);
|
||||||
|
|
||||||
|
zkClient.close();
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
private void publishThenTestFor(CuratorFramework zkClient, String path, String resourceName, long count) throws Exception {
|
private void publishThenTestFor(CuratorFramework zkClient, String path, String resourceName, long count) throws Exception {
|
||||||
FlowRule rule = new FlowRule().setResource(resourceName)
|
FlowRule rule = new FlowRule().setResource(resourceName)
|
||||||
.setLimitApp("default")
|
.setLimitApp("default")
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue