Update ZooKeeper data source integration
- Code refinement - Add test cases Signed-off-by: Eric Zhao <sczyh16@gmail.com>
This commit is contained in:
parent
3395412c57
commit
fd936a4567
|
|
@ -14,7 +14,6 @@
|
||||||
<properties>
|
<properties>
|
||||||
<zookeeper.version>3.4.13</zookeeper.version>
|
<zookeeper.version>3.4.13</zookeeper.version>
|
||||||
<curator.version>4.0.1</curator.version>
|
<curator.version>4.0.1</curator.version>
|
||||||
<curator-test.version>2.12.0</curator-test.version>
|
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
@ -44,8 +43,8 @@
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.curator</groupId>
|
<groupId>org.apache.curator</groupId>
|
||||||
<artifactId>curator-test</artifactId>
|
<artifactId>curator-recipes</artifactId>
|
||||||
<version>${curator-test.version}</version>
|
<version>${curator.version}</version>
|
||||||
<exclusions>
|
<exclusions>
|
||||||
<exclusion>
|
<exclusion>
|
||||||
<groupId>org.apache.zookeeper</groupId>
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@ package com.alibaba.csp.sentinel.demo.datasource.zookeeper;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||||
import org.apache.curator.test.TestingServer;
|
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
|
||||||
|
|
@ -18,11 +17,7 @@ public class ZookeeperConfigSender {
|
||||||
private static final int SLEEP_TIME = 1000;
|
private static final int SLEEP_TIME = 1000;
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
final String remoteAddress = "localhost:2181";
|
||||||
// 启动Zookeeper服务
|
|
||||||
TestingServer server = new TestingServer(2181);
|
|
||||||
|
|
||||||
final String remoteAddress = server.getConnectString();
|
|
||||||
final String groupId = "Sentinel-Demo";
|
final String groupId = "Sentinel-Demo";
|
||||||
final String dataId = "SYSTEM-CODE-DEMO-FLOW";
|
final String dataId = "SYSTEM-CODE-DEMO-FLOW";
|
||||||
final String rule = "[\n"
|
final String rule = "[\n"
|
||||||
|
|
@ -44,18 +39,14 @@ public class ZookeeperConfigSender {
|
||||||
zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null);
|
zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null);
|
||||||
}
|
}
|
||||||
zkClient.setData().forPath(path, rule.getBytes());
|
zkClient.setData().forPath(path, rule.getBytes());
|
||||||
// zkClient.delete().forPath(path);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(30000L);
|
Thread.sleep(3000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
||||||
zkClient.close();
|
zkClient.close();
|
||||||
|
|
||||||
//停止zookeeper服务
|
|
||||||
server.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getPath(String groupId, String dataId) {
|
private static String getPath(String groupId, String dataId) {
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@
|
||||||
<properties>
|
<properties>
|
||||||
<zookeeper.version>3.4.13</zookeeper.version>
|
<zookeeper.version>3.4.13</zookeeper.version>
|
||||||
<curator.version>4.0.1</curator.version>
|
<curator.version>4.0.1</curator.version>
|
||||||
|
<curator.test.version>2.12.0</curator.test.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
@ -39,5 +40,28 @@
|
||||||
</exclusion>
|
</exclusion>
|
||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.curator</groupId>
|
||||||
|
<artifactId>curator-test</artifactId>
|
||||||
|
<version>${curator.test.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
|
<artifactId>zookeeper</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba</groupId>
|
||||||
|
<artifactId>fastjson</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
@ -10,6 +10,7 @@ import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
|
||||||
import com.alibaba.csp.sentinel.datasource.ConfigParser;
|
import com.alibaba.csp.sentinel.datasource.ConfigParser;
|
||||||
import com.alibaba.csp.sentinel.log.RecordLog;
|
import com.alibaba.csp.sentinel.log.RecordLog;
|
||||||
import com.alibaba.csp.sentinel.util.StringUtil;
|
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||||
|
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData;
|
import org.apache.curator.framework.recipes.cache.ChildData;
|
||||||
|
|
@ -30,40 +31,35 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
private static final int SLEEP_TIME = 1000;
|
private static final int SLEEP_TIME = 1000;
|
||||||
|
|
||||||
private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
|
private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
|
||||||
new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"),
|
new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"),
|
||||||
new ThreadPoolExecutor.DiscardOldestPolicy());
|
new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||||
|
|
||||||
private NodeCacheListener listener;
|
private NodeCacheListener listener;
|
||||||
private final String groupId;
|
|
||||||
private final String dataId;
|
|
||||||
private final String path;
|
private final String path;
|
||||||
|
|
||||||
private CuratorFramework zkClient = null;
|
private CuratorFramework zkClient = null;
|
||||||
private NodeCache nodeCache = null;
|
private NodeCache nodeCache = null;
|
||||||
|
|
||||||
public ZookeeperDataSource(final String serverAddr, final String groupId, final String dataId,
|
public ZookeeperDataSource(final String serverAddr, final String path, ConfigParser<String, T> parser) {
|
||||||
ConfigParser<String, T> parser) {
|
|
||||||
super(parser);
|
super(parser);
|
||||||
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
|
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) {
|
||||||
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]",
|
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]", serverAddr, path));
|
||||||
serverAddr, groupId, dataId));
|
|
||||||
}
|
}
|
||||||
this.groupId = groupId;
|
this.path = path;
|
||||||
this.dataId = dataId;
|
|
||||||
this.path = getPath(groupId, dataId);
|
|
||||||
|
|
||||||
init(serverAddr);
|
init(serverAddr);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ZookeeperDataSource(final String serverAddr, final String path, ConfigParser<String, T> parser) {
|
/**
|
||||||
|
* This constructor is Nacos-style.
|
||||||
|
*/
|
||||||
|
public ZookeeperDataSource(final String serverAddr, final String groupId, final String dataId,
|
||||||
|
ConfigParser<String, T> parser) {
|
||||||
super(parser);
|
super(parser);
|
||||||
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) {
|
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
|
||||||
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]",
|
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]", serverAddr, groupId, dataId));
|
||||||
serverAddr, path));
|
|
||||||
}
|
}
|
||||||
this.path = path;
|
this.path = getPath(groupId, dataId);
|
||||||
this.groupId = null;
|
|
||||||
this.dataId = null;
|
|
||||||
|
|
||||||
init(serverAddr);
|
init(serverAddr);
|
||||||
}
|
}
|
||||||
|
|
@ -90,7 +86,7 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
|
|
||||||
this.listener = new NodeCacheListener() {
|
this.listener = new NodeCacheListener() {
|
||||||
@Override
|
@Override
|
||||||
public void nodeChanged() throws Exception {
|
public void nodeChanged() {
|
||||||
String configInfo = null;
|
String configInfo = null;
|
||||||
ChildData childData = nodeCache.getCurrentData();
|
ChildData childData = nodeCache.getCurrentData();
|
||||||
if (null != childData && childData.getData() != null) {
|
if (null != childData && childData.getData() != null) {
|
||||||
|
|
@ -98,7 +94,7 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
configInfo = new String(childData.getData());
|
configInfo = new String(childData.getData());
|
||||||
}
|
}
|
||||||
RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s",
|
RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s",
|
||||||
serverAddr, path, configInfo));
|
serverAddr, path, configInfo));
|
||||||
T newValue = ZookeeperDataSource.this.parser.parse(configInfo);
|
T newValue = ZookeeperDataSource.this.parser.parse(configInfo);
|
||||||
// Update the new value to the property.
|
// Update the new value to the property.
|
||||||
getProperty().updateValue(newValue);
|
getProperty().updateValue(newValue);
|
||||||
|
|
@ -146,17 +142,6 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getPath(String groupId, String dataId) {
|
private String getPath(String groupId, String dataId) {
|
||||||
String path = "";
|
return String.format("/%s/%s", groupId, dataId);
|
||||||
if (groupId.startsWith("/")) {
|
|
||||||
path += groupId;
|
|
||||||
} else {
|
|
||||||
path += "/" + groupId;
|
|
||||||
}
|
|
||||||
if (dataId.startsWith("/")) {
|
|
||||||
path += dataId;
|
|
||||||
} else {
|
|
||||||
path += "/" + dataId;
|
|
||||||
}
|
|
||||||
return path;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,84 @@
|
||||||
|
package com.alibaba.csp.sentinel.datasource.zookeeper;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.alibaba.csp.sentinel.datasource.ConfigParser;
|
||||||
|
import com.alibaba.csp.sentinel.datasource.DataSource;
|
||||||
|
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
|
||||||
|
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
|
||||||
|
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.alibaba.fastjson.TypeReference;
|
||||||
|
|
||||||
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
|
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||||
|
import org.apache.curator.test.TestingServer;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Eric Zhao
|
||||||
|
*/
|
||||||
|
public class ZookeeperDataSourceTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testZooKeeperDataSource() throws Exception {
|
||||||
|
TestingServer server = new TestingServer(21812);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final String remoteAddress = server.getConnectString();
|
||||||
|
final String path = "/sentinel-zk-ds-demo/flow-HK";
|
||||||
|
|
||||||
|
DataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<List<FlowRule>>(remoteAddress, path,
|
||||||
|
new ConfigParser<String, List<FlowRule>>() {
|
||||||
|
@Override
|
||||||
|
public List<FlowRule> parse(String source) {
|
||||||
|
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
|
||||||
|
|
||||||
|
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(remoteAddress,
|
||||||
|
new ExponentialBackoffRetry(3, 1000));
|
||||||
|
zkClient.start();
|
||||||
|
Stat stat = zkClient.checkExists().forPath(path);
|
||||||
|
if (stat == null) {
|
||||||
|
zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String resourceName = "HK";
|
||||||
|
publishThenTestFor(zkClient, path, resourceName, 10);
|
||||||
|
publishThenTestFor(zkClient, path, resourceName, 15);
|
||||||
|
|
||||||
|
zkClient.close();
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void publishThenTestFor(CuratorFramework zkClient, String path, String resourceName, long count) throws Exception {
|
||||||
|
FlowRule rule = new FlowRule().setResource(resourceName)
|
||||||
|
.setLimitApp("default")
|
||||||
|
.as(FlowRule.class)
|
||||||
|
.setCount(count)
|
||||||
|
.setGrade(RuleConstant.FLOW_GRADE_QPS);
|
||||||
|
String ruleString = JSON.toJSONString(Collections.singletonList(rule));
|
||||||
|
zkClient.setData().forPath(path, ruleString.getBytes());
|
||||||
|
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
List<FlowRule> rules = FlowRuleManager.getRules();
|
||||||
|
assertTrue(rules != null && !rules.isEmpty());
|
||||||
|
boolean exists = false;
|
||||||
|
for (FlowRule r : rules) {
|
||||||
|
if (resourceName.equals(r.getResource())) {
|
||||||
|
exists = true;
|
||||||
|
assertEquals(count, new Double(r.getCount()).longValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(exists);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue