Support Redis cluster mode in Redis data-source extension (#1751)

This commit is contained in:
liqiangz 2021-03-09 09:59:33 +08:00 committed by GitHub
parent 217cccd704
commit 3e438b3dba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 337 additions and 12 deletions

View File

@ -57,6 +57,24 @@ public <T> void pushRules(List<T> rules, Converter<List<T>, String> encoder) {
}
```
Transaction can be handled in Redis Cluster when just using the same key.
An example using Lettuce Redis Cluster client:
```java
public <T> void pushRules(List<T> rules, Converter<List<T>, String> encoder) {
RedisAdvancedClusterCommands<String, String> subCommands = client.connect().sync();
int slot = SlotHash.getSlot(ruleKey);
NodeSelection<String, String> nodes = subCommands.nodes((n)->n.hasSlot(slot));
RedisCommands<String, String> commands = nodes.commands(0);
String value = encoder.convert(rules);
commands.multi();
commands.set(ruleKey, value);
commands.publish(channel, value);
commands.exec();
}
```
## How to build RedisConnectionConfig
### Build with Redis standalone mode
@ -79,3 +97,11 @@ RedisConnectionConfig config = RedisConnectionConfig.builder()
.withRedisSentinel("redisSentinelServer2",5001)
.withRedisSentinelMasterId("redisSentinelMasterId").build();
```
### Build with Redis Cluster mode
```java
RedisConnectionConfig config = RedisConnectionConfig.builder()
.withRedisCluster("redisSentinelServer1",5000)
.withRedisCluster("redisSentinelServer2",5001).build();
```

View File

@ -26,11 +26,16 @@ import com.alibaba.csp.sentinel.util.StringUtil;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@ -59,6 +64,8 @@ public class RedisDataSource<T> extends AbstractDataSource<String, T> {
private final RedisClient redisClient;
private final RedisClusterClient redisClusterClient;
private final String ruleKey;
/**
@ -75,7 +82,13 @@ public class RedisDataSource<T> extends AbstractDataSource<String, T> {
AssertUtil.notNull(connectionConfig, "Redis connection config can not be null");
AssertUtil.notEmpty(ruleKey, "Redis ruleKey can not be empty");
AssertUtil.notEmpty(channel, "Redis subscribe channel can not be empty");
this.redisClient = getRedisClient(connectionConfig);
if (connectionConfig.getRedisClusters().size() == 0) {
this.redisClient = getRedisClient(connectionConfig);
this.redisClusterClient = null;
} else {
this.redisClusterClient = getRedisClusterClient(connectionConfig);
this.redisClient = null;
}
this.ruleKey = ruleKey;
loadInitialConfig();
subscribeFromChannel(channel);
@ -96,6 +109,26 @@ public class RedisDataSource<T> extends AbstractDataSource<String, T> {
}
}
private RedisClusterClient getRedisClusterClient(RedisConnectionConfig connectionConfig) {
char[] password = connectionConfig.getPassword();
String clientName = connectionConfig.getClientName();
//If any uri is successful for connection, the others are not tried anymore
List<RedisURI> redisUris = new ArrayList<>();
for (RedisConnectionConfig config : connectionConfig.getRedisClusters()) {
RedisURI.Builder clusterRedisUriBuilder = RedisURI.builder();
clusterRedisUriBuilder.withHost(config.getHost())
.withPort(config.getPort())
.withTimeout(Duration.ofMillis(connectionConfig.getTimeout()));
//All redis nodes must have same password
if (password != null) {
clusterRedisUriBuilder.withPassword(connectionConfig.getPassword());
}
redisUris.add(clusterRedisUriBuilder.build());
}
return RedisClusterClient.create(redisUris);
}
private RedisClient getRedisStandaloneClient(RedisConnectionConfig connectionConfig) {
char[] password = connectionConfig.getPassword();
String clientName = connectionConfig.getClientName();
@ -132,11 +165,18 @@ public class RedisDataSource<T> extends AbstractDataSource<String, T> {
}
private void subscribeFromChannel(String channel) {
StatefulRedisPubSubConnection<String, String> pubSubConnection = redisClient.connectPubSub();
RedisPubSubAdapter<String, String> adapterListener = new DelegatingRedisPubSubListener();
pubSubConnection.addListener(adapterListener);
RedisPubSubCommands<String, String> sync = pubSubConnection.sync();
sync.subscribe(channel);
if (redisClient != null) {
StatefulRedisPubSubConnection<String, String> pubSubConnection = redisClient.connectPubSub();
pubSubConnection.addListener(adapterListener);
RedisPubSubCommands<String, String> sync = pubSubConnection.sync();
sync.subscribe(channel);
} else {
StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = redisClusterClient.connectPubSub();
pubSubConnection.addListener(adapterListener);
RedisPubSubCommands<String, String> sync = pubSubConnection.sync();
sync.subscribe(channel);
}
}
private void loadInitialConfig() {
@ -153,16 +193,27 @@ public class RedisDataSource<T> extends AbstractDataSource<String, T> {
@Override
public String readSource() {
if (this.redisClient == null) {
throw new IllegalStateException("Redis client has not been initialized or error occurred");
if (this.redisClient == null && this.redisClusterClient == null) {
throw new IllegalStateException("Redis client or Redis Cluster client has not been initialized or error occurred");
}
if (redisClient != null) {
RedisCommands<String, String> stringRedisCommands = redisClient.connect().sync();
return stringRedisCommands.get(ruleKey);
} else {
RedisAdvancedClusterCommands<String, String> stringRedisCommands = redisClusterClient.connect().sync();
return stringRedisCommands.get(ruleKey);
}
RedisCommands<String, String> stringRedisCommands = redisClient.connect().sync();
return stringRedisCommands.get(ruleKey);
}
@Override
public void close() {
redisClient.shutdown();
if (redisClient != null) {
redisClient.shutdown();
} else {
redisClusterClient.shutdown();
}
}
private class DelegatingRedisPubSubListener extends RedisPubSubAdapter<String, String> {

View File

@ -33,6 +33,11 @@ public class RedisConnectionConfig {
*/
public static final int DEFAULT_SENTINEL_PORT = 26379;
/**
* The default redisCluster port.
*/
public static final int DEFAULT_CLUSTER_PORT = 6379;
/**
* The default redis port.
*/
@ -51,6 +56,7 @@ public class RedisConnectionConfig {
private char[] password;
private long timeout = DEFAULT_TIMEOUT_MILLISECONDS;
private final List<RedisConnectionConfig> redisSentinels = new ArrayList<RedisConnectionConfig>();
private final List<RedisConnectionConfig> redisClusters = new ArrayList<RedisConnectionConfig>();
/**
* Default empty constructor.
@ -238,6 +244,13 @@ public class RedisConnectionConfig {
return redisSentinels;
}
/**
* @return the list of {@link RedisConnectionConfig Redis Cluster URIs}.
*/
public List<RedisConnectionConfig> getRedisClusters() {
return redisClusters;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@ -254,6 +267,10 @@ public class RedisConnectionConfig {
sb.append(", redisSentinelMasterId=").append(redisSentinelMasterId);
}
if (redisClusters.size() > 0) {
sb.append("redisClusters=").append(getRedisClusters());
}
sb.append(']');
return sb.toString();
}
@ -281,6 +298,10 @@ public class RedisConnectionConfig {
: redisURI.redisSentinelMasterId != null) {
return false;
}
if (redisClusters != null ? !redisClusters.equals(redisURI.redisClusters)
: redisURI.redisClusters != null) {
return false;
}
return !(redisSentinels != null ? !redisSentinels.equals(redisURI.redisSentinels)
: redisURI.redisSentinels != null);
@ -293,6 +314,7 @@ public class RedisConnectionConfig {
result = 31 * result + port;
result = 31 * result + database;
result = 31 * result + (redisSentinels != null ? redisSentinels.hashCode() : 0);
result = 31 * result + (redisClusters != null ? redisClusters.hashCode() : 0);
return result;
}
@ -309,6 +331,7 @@ public class RedisConnectionConfig {
private char[] password;
private long timeout = DEFAULT_TIMEOUT_MILLISECONDS;
private final List<RedisHostAndPort> redisSentinels = new ArrayList<RedisHostAndPort>();
private final List<RedisHostAndPort> redisClusters = new ArrayList<RedisHostAndPort>();
private Builder() {
}
@ -424,6 +447,63 @@ public class RedisConnectionConfig {
return this;
}
/**
* Set Cluster host. Creates a new builder.
*
* @param host the host name
* @return New builder with Cluster host/port.
*/
public static RedisConnectionConfig.Builder redisCluster(String host) {
AssertUtil.notEmpty(host, "Host must not be empty");
RedisConnectionConfig.Builder builder = RedisConnectionConfig.builder();
return builder.withRedisCluster(host);
}
/**
* Set Cluster host and port. Creates a new builder.
*
* @param host the host name
* @param port the port
* @return New builder with Cluster host/port.
*/
public static RedisConnectionConfig.Builder redisCluster(String host, int port) {
AssertUtil.notEmpty(host, "Host must not be empty");
AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port));
RedisConnectionConfig.Builder builder = RedisConnectionConfig.builder();
return builder.withRedisCluster(host, port);
}
/**
* Add a withRedisCluster host to the existing builder.
*
* @param host the host name
* @return the builder
*/
public RedisConnectionConfig.Builder withRedisCluster(String host) {
return withRedisCluster(host, DEFAULT_CLUSTER_PORT);
}
/**
* Add a withRedisCluster host/port to the existing builder.
*
* @param host the host name
* @param port the port
* @return the builder
*/
public RedisConnectionConfig.Builder withRedisCluster(String host, int port) {
AssertUtil.assertState(this.host == null, "Cannot use with Redis mode.");
AssertUtil.notEmpty(host, "Host must not be empty");
AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port));
redisClusters.add(RedisHostAndPort.of(host, port));
return this;
}
/**
* Adds host information to the builder. Does only affect Redis URI, cannot be used with Sentinel connections.
*
@ -544,9 +624,9 @@ public class RedisConnectionConfig {
*/
public RedisConnectionConfig build() {
if (redisSentinels.isEmpty() && StringUtil.isEmpty(host)) {
if (redisSentinels.isEmpty() && redisClusters.isEmpty() && StringUtil.isEmpty(host)) {
throw new IllegalStateException(
"Cannot build a RedisConnectionConfig. One of the following must be provided Host, Socket or "
"Cannot build a RedisConnectionConfig. One of the following must be provided Host, Socket, Cluster or "
+ "Sentinel");
}
@ -568,6 +648,11 @@ public class RedisConnectionConfig {
new RedisConnectionConfig(sentinel.getHost(), sentinel.getPort(), timeout));
}
for (RedisHostAndPort sentinel : redisClusters) {
redisURI.getRedisClusters().add(
new RedisConnectionConfig(sentinel.getHost(), sentinel.getPort(), timeout));
}
redisURI.setTimeout(timeout);
return redisURI;

View File

@ -0,0 +1,124 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.datasource.redis;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.datasource.redis.config.RedisConnectionConfig;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.api.sync.NodeSelection;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.hamcrest.Matchers;
import org.junit.*;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
/**
* Redis redisCluster mode test cases for {@link RedisDataSource}.
*
* @author liqiangz
*/
@Ignore(value = "Before run this test, you need to set up your Redis Cluster.")
public class ClusterModeRedisDataSourceTest {
private String host = "localhost";
private int redisSentinelPort = 7000;
private final RedisClusterClient client = RedisClusterClient.create(RedisURI.Builder.redis(host, redisSentinelPort).build());
private String ruleKey = "sentinel.rules.flow.ruleKey";
private String channel = "sentinel.rules.flow.channel";
@Before
public void initData() {
Converter<String, List<FlowRule>> flowConfigParser = buildFlowConfigParser();
RedisConnectionConfig config = RedisConnectionConfig.builder()
.withRedisCluster(host, redisSentinelPort).build();
initRedisRuleData();
ReadableDataSource<String, List<FlowRule>> redisDataSource = new RedisDataSource<>(config,
ruleKey, channel, flowConfigParser);
FlowRuleManager.register2Property(redisDataSource.getProperty());
}
@Test
public void testConnectToSentinelAndPubMsgSuccess() {
int maxQueueingTimeMs = new Random().nextInt();
String flowRulesJson =
"[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, "
+ "\"refResource\":null, "
+
"\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":" + maxQueueingTimeMs
+ ", \"controller\":null}]";
RedisAdvancedClusterCommands<String, String> subCommands = client.connect().sync();
int slot = SlotHash.getSlot(ruleKey);
NodeSelection<String, String> nodes = subCommands.nodes((n) -> n.hasSlot(slot));
RedisCommands<String, String> commands = nodes.commands(0);
commands.multi();
commands.set(ruleKey, flowRulesJson);
commands.publish(channel, flowRulesJson);
commands.exec();
await().timeout(2, TimeUnit.SECONDS)
.until(new Callable<List<FlowRule>>() {
@Override
public List<FlowRule> call() throws Exception {
return FlowRuleManager.getRules();
}
}, Matchers.hasSize(1));
List<FlowRule> rules = FlowRuleManager.getRules();
Assert.assertEquals(rules.get(0).getMaxQueueingTimeMs(), maxQueueingTimeMs);
String value = subCommands.get(ruleKey);
List<FlowRule> flowRulesValuesInRedis = buildFlowConfigParser().convert(value);
Assert.assertEquals(flowRulesValuesInRedis.size(), 1);
Assert.assertEquals(flowRulesValuesInRedis.get(0).getMaxQueueingTimeMs(), maxQueueingTimeMs);
}
@After
public void clearResource() {
RedisAdvancedClusterCommands<String, String> stringRedisCommands = client.connect().sync();
stringRedisCommands.del(ruleKey);
client.shutdown();
}
private Converter<String, List<FlowRule>> buildFlowConfigParser() {
return source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
}
private void initRedisRuleData() {
String flowRulesJson =
"[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, "
+ "\"refResource\":null, "
+
"\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]";
RedisAdvancedClusterCommands<String, String> stringRedisCommands = client.connect().sync();
String ok = stringRedisCommands.set(ruleKey, flowRulesJson);
Assert.assertEquals("OK", ok);
}
}

View File

@ -94,4 +94,43 @@ public class RedisConnectionConfigTest {
Assert.assertNull(redisConnectionConfig.getHost());
Assert.assertEquals(3, redisConnectionConfig.getRedisSentinels().size());
}
@Test
public void testRedisClusterDefaultPortSuccess() {
String host = "localhost";
RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redisCluster(host)
.withPassword("211233")
.build();
Assert.assertNull(redisConnectionConfig.getHost());
Assert.assertEquals(1, redisConnectionConfig.getRedisClusters().size());
Assert.assertEquals(RedisConnectionConfig.DEFAULT_CLUSTER_PORT,
redisConnectionConfig.getRedisClusters().get(0).getPort());
}
@Test
public void testRedisClusterMoreThanOneServerSuccess() {
String host = "localhost";
String host2 = "server2";
int port1 = 1879;
int port2 = 1880;
RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redisCluster(host, port1)
.withRedisCluster(host2, port2)
.build();
Assert.assertNull(redisConnectionConfig.getHost());
Assert.assertEquals(2, redisConnectionConfig.getRedisClusters().size());
}
@Test
public void testRedisClusterMoreThanOneDuplicateServerSuccess() {
String host = "localhost";
String host2 = "server2";
int port2 = 1879;
RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redisCluster(host)
.withRedisCluster(host2, port2)
.withRedisCluster(host2, port2)
.withPassword("211233")
.build();
Assert.assertNull(redisConnectionConfig.getHost());
Assert.assertEquals(3, redisConnectionConfig.getRedisClusters().size());
}
}