Reuse connections of the same address in ZooKeeper data-source (#788)

* Fix the problem that too many connections (zkClient) are established
This commit is contained in:
Lin.Liang 2019-06-27 13:39:56 +08:00 committed by Eric Zhao
parent dca4440d40
commit ba146765e3
2 changed files with 164 additions and 37 deletions

View File

@ -1,17 +1,10 @@
package com.alibaba.csp.sentinel.datasource.zookeeper; package com.alibaba.csp.sentinel.datasource.zookeeper;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.datasource.AbstractDataSource; import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.csp.sentinel.util.StringUtil;
import org.apache.curator.framework.AuthInfo; import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
@ -19,8 +12,15 @@ import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat; import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* A read-only {@code DataSource} with ZooKeeper backend. * A read-only {@code DataSource} with ZooKeeper backend.
@ -32,6 +32,10 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
private static final int RETRY_TIMES = 3; private static final int RETRY_TIMES = 3;
private static final int SLEEP_TIME = 1000; private static final int SLEEP_TIME = 1000;
private static volatile Map<String, CuratorFramework> zkClientMap = new HashMap<>();
private static final Object lock = new Object();
private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"), new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"),
new ThreadPoolExecutor.DiscardOldestPolicy()); new ThreadPoolExecutor.DiscardOldestPolicy());
@ -116,16 +120,33 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
} }
}; };
if (authInfos == null || authInfos.size() == 0) { String zkKey = getZkKey(serverAddr, authInfos);
this.zkClient = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)); if (zkClientMap.containsKey(zkKey)) {
this.zkClient = zkClientMap.get(zkKey);
} else { } else {
this.zkClient = CuratorFrameworkFactory.builder(). synchronized (lock) {
if (!zkClientMap.containsKey(zkKey)) {
CuratorFramework zc = null;
if (authInfos == null || authInfos.size() == 0) {
zc = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES));
} else {
zc = CuratorFrameworkFactory.builder().
connectString(serverAddr). connectString(serverAddr).
retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)). retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)).
authorization(authInfos). authorization(authInfos).
build(); build();
} }
this.zkClient = zc;
this.zkClient.start(); this.zkClient.start();
Map<String, CuratorFramework> newZkClientMap = new HashMap<>(zkClientMap.size());
newZkClientMap.putAll(zkClientMap);
newZkClientMap.put(zkKey, zc);
zkClientMap = newZkClientMap;
} else {
this.zkClient = zkClientMap.get(zkKey);
}
}
}
this.nodeCache = new NodeCache(this.zkClient, this.path); this.nodeCache = new NodeCache(this.zkClient, this.path);
this.nodeCache.getListenable().addListener(this.listener, this.pool); this.nodeCache.getListenable().addListener(this.listener, this.pool);
@ -165,4 +186,31 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
private String getPath(String groupId, String dataId) { private String getPath(String groupId, String dataId) {
return String.format("/%s/%s", groupId, dataId); return String.format("/%s/%s", groupId, dataId);
} }
private String getZkKey(final String serverAddr, final List<AuthInfo> authInfos) {
if (authInfos == null || authInfos.size() == 0) {
return serverAddr;
}
StringBuilder builder = new StringBuilder(64);
builder.append(serverAddr).append(getAuthInfosKey(authInfos));
return builder.toString();
}
private String getAuthInfosKey(List<AuthInfo> authInfos) {
StringBuilder builder = new StringBuilder(32);
for (AuthInfo authInfo : authInfos) {
if (authInfo == null) {
builder.append("{}");
} else {
builder.append("{" + "sc=" + authInfo.getScheme() + ",au=" + Arrays.toString(authInfo.getAuth()) + "}");
}
}
return builder.toString();
}
protected CuratorFramework getZkClient() {
return this.zkClient;
}
} }

View File

@ -3,6 +3,7 @@ package com.alibaba.csp.sentinel.datasource.zookeeper;
import com.alibaba.csp.sentinel.datasource.Converter; import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource; import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
@ -18,6 +19,7 @@ import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Collections; import java.util.Collections;
@ -46,7 +48,8 @@ public class ZookeeperDataSourceTest {
new Converter<String, List<FlowRule>>() { new Converter<String, List<FlowRule>>() {
@Override @Override
public List<FlowRule> convert(String source) { public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}); return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
} }
}); });
FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
@ -67,6 +70,9 @@ public class ZookeeperDataSourceTest {
server.stop(); server.stop();
} }
@Test @Test
public void testZooKeeperDataSourceAuthorization() throws Exception { public void testZooKeeperDataSourceAuthorization() throws Exception {
TestingServer server = new TestingServer(21812); TestingServer server = new TestingServer(21812);
@ -142,4 +148,77 @@ public class ZookeeperDataSourceTest {
} }
assertTrue(exists); assertTrue(exists);
} }
/**
* Test whether different dataSources can share the same zkClient when the connection parameters are the same.
* @throws Exception
*/
@Test
public void testZooKeeperDataSourceSameZkClient() throws Exception {
TestingServer server = new TestingServer(21813);
server.start();
final String remoteAddress = server.getConnectString();
final String flowPath = "/sentinel-zk-ds-demo/flow-HK";
final String degradePath = "/sentinel-zk-ds-demo/degrade-HK";
ZookeeperDataSource<List<FlowRule>> flowRuleZkDataSource = new ZookeeperDataSource<>(remoteAddress, flowPath,
new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
}
});
ZookeeperDataSource<List<DegradeRule>> degradeRuleZkDataSource = new ZookeeperDataSource<>(remoteAddress, degradePath,
new Converter<String, List<DegradeRule>>() {
@Override
public List<DegradeRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<DegradeRule>>() {
});
}
});
Assert.assertTrue(flowRuleZkDataSource.getZkClient() == degradeRuleZkDataSource.getZkClient());
final String groupId = "sentinel-zk-ds-demo";
final String flowDataId = "flow-HK";
final String degradeDataId = "degrade-HK";
final String scheme = "digest";
final String auth = "root:123456";
AuthInfo authInfo = new AuthInfo(scheme, auth.getBytes());
List<AuthInfo> authInfoList = Collections.singletonList(authInfo);
ZookeeperDataSource<List<FlowRule>> flowRuleZkAutoDataSource = new ZookeeperDataSource<List<FlowRule>>(remoteAddress,
authInfoList, groupId, flowDataId,
new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
}
});
ZookeeperDataSource<List<DegradeRule>> degradeRuleZkAutoDataSource = new ZookeeperDataSource<List<DegradeRule>>(remoteAddress,
authInfoList, groupId, degradeDataId,
new Converter<String, List<DegradeRule>>() {
@Override
public List<DegradeRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<DegradeRule>>() {
});
}
});
Assert.assertTrue(flowRuleZkAutoDataSource.getZkClient() == degradeRuleZkAutoDataSource.getZkClient());
server.stop();
}
} }