From ba146765e36b45b9f109bc9779bf2c2faf5d0bbe Mon Sep 17 00:00:00 2001 From: "Lin.Liang" <546648227@qq.com> Date: Thu, 27 Jun 2019 13:39:56 +0800 Subject: [PATCH] Reuse connections of the same address in ZooKeeper data-source (#788) * Fix the problem that too many connections (zkClient) are established --- .../zookeeper/ZookeeperDataSource.java | 86 ++++++++++--- .../zookeeper/ZookeeperDataSourceTest.java | 115 +++++++++++++++--- 2 files changed, 164 insertions(+), 37 deletions(-) diff --git a/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java b/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java index 24c3557b..5fb243bb 100644 --- a/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java +++ b/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java @@ -1,17 +1,10 @@ package com.alibaba.csp.sentinel.datasource.zookeeper; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; import com.alibaba.csp.sentinel.datasource.AbstractDataSource; import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.util.StringUtil; - import org.apache.curator.framework.AuthInfo; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -19,8 +12,15 @@ import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.Stat; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * A read-only {@code DataSource} with ZooKeeper backend. @@ -32,9 +32,13 @@ public class ZookeeperDataSource extends AbstractDataSource { private static final int RETRY_TIMES = 3; private static final int SLEEP_TIME = 1000; + private static volatile Map zkClientMap = new HashMap<>(); + private static final Object lock = new Object(); + + private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"), - new ThreadPoolExecutor.DiscardOldestPolicy()); + new ArrayBlockingQueue(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"), + new ThreadPoolExecutor.DiscardOldestPolicy()); private NodeCacheListener listener; private final String path; @@ -116,16 +120,33 @@ public class ZookeeperDataSource extends AbstractDataSource { } }; - if (authInfos == null || authInfos.size() == 0) { - this.zkClient = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)); + String zkKey = getZkKey(serverAddr, authInfos); + if (zkClientMap.containsKey(zkKey)) { + this.zkClient = zkClientMap.get(zkKey); } else { - this.zkClient = CuratorFrameworkFactory.builder(). - connectString(serverAddr). - retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)). - authorization(authInfos). - build(); + synchronized (lock) { + if (!zkClientMap.containsKey(zkKey)) { + CuratorFramework zc = null; + if (authInfos == null || authInfos.size() == 0) { + zc = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)); + } else { + zc = CuratorFrameworkFactory.builder(). + connectString(serverAddr). + retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)). + authorization(authInfos). + build(); + } + this.zkClient = zc; + this.zkClient.start(); + Map newZkClientMap = new HashMap<>(zkClientMap.size()); + newZkClientMap.putAll(zkClientMap); + newZkClientMap.put(zkKey, zc); + zkClientMap = newZkClientMap; + } else { + this.zkClient = zkClientMap.get(zkKey); + } + } } - this.zkClient.start(); this.nodeCache = new NodeCache(this.zkClient, this.path); this.nodeCache.getListenable().addListener(this.listener, this.pool); @@ -165,4 +186,31 @@ public class ZookeeperDataSource extends AbstractDataSource { private String getPath(String groupId, String dataId) { return String.format("/%s/%s", groupId, dataId); } + + private String getZkKey(final String serverAddr, final List authInfos) { + if (authInfos == null || authInfos.size() == 0) { + return serverAddr; + } + StringBuilder builder = new StringBuilder(64); + builder.append(serverAddr).append(getAuthInfosKey(authInfos)); + return builder.toString(); + } + + private String getAuthInfosKey(List authInfos) { + StringBuilder builder = new StringBuilder(32); + for (AuthInfo authInfo : authInfos) { + if (authInfo == null) { + builder.append("{}"); + } else { + builder.append("{" + "sc=" + authInfo.getScheme() + ",au=" + Arrays.toString(authInfo.getAuth()) + "}"); + } + } + return builder.toString(); + } + + protected CuratorFramework getZkClient() { + return this.zkClient; + } + + } diff --git a/sentinel-extension/sentinel-datasource-zookeeper/src/test/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSourceTest.java b/sentinel-extension/sentinel-datasource-zookeeper/src/test/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSourceTest.java index 52867cc3..f9c62e65 100644 --- a/sentinel-extension/sentinel-datasource-zookeeper/src/test/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSourceTest.java +++ b/sentinel-extension/sentinel-datasource-zookeeper/src/test/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSourceTest.java @@ -3,6 +3,7 @@ package com.alibaba.csp.sentinel.datasource.zookeeper; import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.datasource.ReadableDataSource; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.fastjson.JSON; @@ -18,6 +19,7 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; +import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -43,16 +45,17 @@ public class ZookeeperDataSourceTest { final String path = "/sentinel-zk-ds-demo/flow-HK"; ReadableDataSource> flowRuleDataSource = new ZookeeperDataSource>(remoteAddress, path, - new Converter>() { - @Override - public List convert(String source) { - return JSON.parseObject(source, new TypeReference>() {}); - } - }); + new Converter>() { + @Override + public List convert(String source) { + return JSON.parseObject(source, new TypeReference>() { + }); + } + }); FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); CuratorFramework zkClient = CuratorFrameworkFactory.newClient(remoteAddress, - new ExponentialBackoffRetry(3, 1000)); + new ExponentialBackoffRetry(3, 1000)); zkClient.start(); Stat stat = zkClient.checkExists().forPath(path); if (stat == null) { @@ -67,6 +70,9 @@ public class ZookeeperDataSourceTest { server.stop(); } + + + @Test public void testZooKeeperDataSourceAuthorization() throws Exception { TestingServer server = new TestingServer(21812); @@ -116,21 +122,21 @@ public class ZookeeperDataSourceTest { private void publishThenTestFor(CuratorFramework zkClient, String path, String resourceName, long count) throws Exception { FlowRule rule = new FlowRule().setResource(resourceName) - .setLimitApp("default") - .as(FlowRule.class) - .setCount(count) - .setGrade(RuleConstant.FLOW_GRADE_QPS); + .setLimitApp("default") + .as(FlowRule.class) + .setCount(count) + .setGrade(RuleConstant.FLOW_GRADE_QPS); String ruleString = JSON.toJSONString(Collections.singletonList(rule)); zkClient.setData().forPath(path, ruleString.getBytes()); await().timeout(5, TimeUnit.SECONDS) - .until(new Callable() { - @Override - public Boolean call() throws Exception { - List rules = FlowRuleManager.getRules(); - return rules != null && !rules.isEmpty(); - } - }); + .until(new Callable() { + @Override + public Boolean call() throws Exception { + List rules = FlowRuleManager.getRules(); + return rules != null && !rules.isEmpty(); + } + }); List rules = FlowRuleManager.getRules(); boolean exists = false; @@ -142,4 +148,77 @@ public class ZookeeperDataSourceTest { } assertTrue(exists); } + + + /** + * Test whether different dataSources can share the same zkClient when the connection parameters are the same. + * @throws Exception + */ + @Test + public void testZooKeeperDataSourceSameZkClient() throws Exception { + TestingServer server = new TestingServer(21813); + server.start(); + + final String remoteAddress = server.getConnectString(); + final String flowPath = "/sentinel-zk-ds-demo/flow-HK"; + final String degradePath = "/sentinel-zk-ds-demo/degrade-HK"; + + + ZookeeperDataSource> flowRuleZkDataSource = new ZookeeperDataSource<>(remoteAddress, flowPath, + new Converter>() { + @Override + public List convert(String source) { + return JSON.parseObject(source, new TypeReference>() { + }); + } + }); + ZookeeperDataSource> degradeRuleZkDataSource = new ZookeeperDataSource<>(remoteAddress, degradePath, + new Converter>() { + @Override + public List convert(String source) { + return JSON.parseObject(source, new TypeReference>() { + }); + } + }); + + + Assert.assertTrue(flowRuleZkDataSource.getZkClient() == degradeRuleZkDataSource.getZkClient()); + + + final String groupId = "sentinel-zk-ds-demo"; + final String flowDataId = "flow-HK"; + final String degradeDataId = "degrade-HK"; + final String scheme = "digest"; + final String auth = "root:123456"; + AuthInfo authInfo = new AuthInfo(scheme, auth.getBytes()); + List authInfoList = Collections.singletonList(authInfo); + + + ZookeeperDataSource> flowRuleZkAutoDataSource = new ZookeeperDataSource>(remoteAddress, + authInfoList, groupId, flowDataId, + new Converter>() { + @Override + public List convert(String source) { + return JSON.parseObject(source, new TypeReference>() { + }); + } + }); + + ZookeeperDataSource> degradeRuleZkAutoDataSource = new ZookeeperDataSource>(remoteAddress, + authInfoList, groupId, degradeDataId, + new Converter>() { + @Override + public List convert(String source) { + return JSON.parseObject(source, new TypeReference>() { + }); + } + }); + + Assert.assertTrue(flowRuleZkAutoDataSource.getZkClient() == degradeRuleZkAutoDataSource.getZkClient()); + + server.stop(); + } + + + } \ No newline at end of file