Add DataSource integration for Redis (#102)
- This implementation uses Lettuce as the internal client, and leverages Redis pub-sub feature to implement push mode data source. (by @tigerMoon)
This commit is contained in:
parent
5edac71fef
commit
4ff2e37abb
|
|
@ -16,6 +16,7 @@
|
|||
<module>sentinel-datasource-nacos</module>
|
||||
<module>sentinel-datasource-zookeeper</module>
|
||||
<module>sentinel-datasource-apollo</module>
|
||||
<module>sentinel-datasource-redis</module>
|
||||
<module>sentinel-annotation-aspectj</module>
|
||||
</modules>
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,80 @@
|
|||
# Sentinel DataSource Redis
|
||||
|
||||
Sentinel DataSource Redis provides integration with Redis. make Redis
|
||||
as dynamic rule data source of Sentinel. The data source uses push model (listener) with redis pub/sub feature.
|
||||
|
||||
**NOTE**:
|
||||
we not support redis cluster as a pub/sub dataSource now.
|
||||
|
||||
To use Sentinel DataSource Redis, you should add the following dependency:
|
||||
|
||||
```xml
|
||||
<dependency>
|
||||
<groupId>com.alibaba.csp</groupId>
|
||||
<artifactId>sentinel-datasource-redis</artifactId>
|
||||
<version>x.y.z</version>
|
||||
</dependency>
|
||||
|
||||
```
|
||||
|
||||
Then you can create an `RedisDataSource` and register to rule managers.
|
||||
For instance:
|
||||
|
||||
```java
|
||||
ReadableDataSource<String, List<FlowRule>> redisDataSource = new RedisDataSource<List<FlowRule>>(redisConnectionConfig, ruleKey, channel, flowConfigParser);
|
||||
FlowRuleManager.register2Property(redisDataSource.getProperty());
|
||||
```
|
||||
|
||||
_**redisConnectionConfig**_ : use `RedisConnectionConfig` class to build your connection config.
|
||||
|
||||
_**ruleKey**_ : when the json rule data publish. it also should save to the key for init read.
|
||||
|
||||
_**channel**_ : the channel to listen.
|
||||
|
||||
you can also create multi data source listen for different rule type.
|
||||
|
||||
you can see test cases for usage.
|
||||
|
||||
## Before start
|
||||
|
||||
RedisDataSource init config by read from redis key `ruleKey`, value store the latest rule config data.
|
||||
so you should first config your redis ruleData in back end.
|
||||
|
||||
since update redis rule data. it should simultaneously send data to `channel`.
|
||||
|
||||
you may implement like this (using Redis transaction):
|
||||
|
||||
```
|
||||
|
||||
MULTI
|
||||
PUBLISH channel value
|
||||
SET ruleKey value
|
||||
EXEC
|
||||
|
||||
```
|
||||
|
||||
|
||||
## How to build RedisConnectionConfig
|
||||
|
||||
|
||||
### Build with redis standLone mode
|
||||
|
||||
```java
|
||||
RedisConnectionConfig config = RedisConnectionConfig.builder()
|
||||
.withHost("localhost")
|
||||
.withPort(6379)
|
||||
.withPassword("pwd")
|
||||
.withDataBase(2)
|
||||
.build();
|
||||
|
||||
```
|
||||
|
||||
|
||||
### Build with redis sentinel mode
|
||||
|
||||
```java
|
||||
RedisConnectionConfig config = RedisConnectionConfig.builder()
|
||||
.withRedisSentinel("redisSentinelServer1",5000)
|
||||
.withRedisSentinel("redisSentinelServer2",5001)
|
||||
.withRedisSentinelMasterId("redisSentinelMasterId").build();
|
||||
```
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>sentinel-extension</artifactId>
|
||||
<groupId>com.alibaba.csp</groupId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>sentinel-datasource-redis</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<lettuce.version>5.0.1.RELEASE</lettuce.version>
|
||||
<redis.mock.version>0.1.6</redis.mock.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.lettuce</groupId>
|
||||
<artifactId>lettuce-core</artifactId>
|
||||
<version>${lettuce.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.csp</groupId>
|
||||
<artifactId>sentinel-datasource-extension</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ai.grakn</groupId>
|
||||
<artifactId>redis-mock</artifactId>
|
||||
<version>${redis.mock.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
@ -0,0 +1,166 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.csp.sentinel.datasource.redis;
|
||||
|
||||
import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
|
||||
import com.alibaba.csp.sentinel.datasource.Converter;
|
||||
import com.alibaba.csp.sentinel.datasource.redis.config.RedisConnectionConfig;
|
||||
import com.alibaba.csp.sentinel.datasource.redis.util.AssertUtil;
|
||||
import com.alibaba.csp.sentinel.log.RecordLog;
|
||||
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||
import io.lettuce.core.RedisClient;
|
||||
import io.lettuce.core.RedisURI;
|
||||
import io.lettuce.core.api.sync.RedisCommands;
|
||||
import io.lettuce.core.pubsub.RedisPubSubAdapter;
|
||||
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
|
||||
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* A read-only {@code DataSource} with Redis backend.
|
||||
* <p>
|
||||
* When data source init,reads form redis with string k-v,value is string format rule config data.
|
||||
* This data source subscribe from specific channel and then data published in redis with this channel,data source
|
||||
* will be notified and then update rule config in time.
|
||||
* </p>
|
||||
*
|
||||
* @author tiger
|
||||
*/
|
||||
|
||||
public class RedisDataSource<T> extends AbstractDataSource<String, T> {
|
||||
|
||||
private RedisClient redisClient = null;
|
||||
|
||||
private String ruleKey;
|
||||
|
||||
/**
|
||||
* Constructor of {@code RedisDataSource}
|
||||
*
|
||||
* @param connectionConfig redis connection config.
|
||||
* @param ruleKey data save in redis.
|
||||
* @param channel subscribe from channel.
|
||||
* @param parser convert <code>ruleKey<code>`s value to {@literal alibaba/Sentinel} rule type
|
||||
*/
|
||||
public RedisDataSource(RedisConnectionConfig connectionConfig, String ruleKey, String channel, Converter<String, T> parser) {
|
||||
super(parser);
|
||||
AssertUtil.notNull(connectionConfig, "redis connection config can not be null");
|
||||
AssertUtil.notEmpty(ruleKey, "redis subscribe ruleKey can not be empty");
|
||||
AssertUtil.notEmpty(channel, "redis subscribe channel can not be empty");
|
||||
this.redisClient = getRedisClient(connectionConfig);
|
||||
this.ruleKey = ruleKey;
|
||||
loadInitialConfig();
|
||||
subscribeFromChannel(channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* build redis client form {@code RedisConnectionConfig} with io.lettuce.
|
||||
*
|
||||
* @return a new {@link RedisClient}
|
||||
*/
|
||||
private RedisClient getRedisClient(RedisConnectionConfig connectionConfig) {
|
||||
if (connectionConfig.getRedisSentinels().size() == 0) {
|
||||
RecordLog.info("start standLone mode to connect to redis");
|
||||
return getRedisStandLoneClient(connectionConfig);
|
||||
} else {
|
||||
RecordLog.info("start redis sentinel mode to connect to redis");
|
||||
return getRedisSentinelClient(connectionConfig);
|
||||
}
|
||||
}
|
||||
|
||||
private RedisClient getRedisStandLoneClient(RedisConnectionConfig connectionConfig) {
|
||||
char[] password = connectionConfig.getPassword();
|
||||
String clientName = connectionConfig.getClientName();
|
||||
RedisURI.Builder redisUriBuilder = RedisURI.builder();
|
||||
redisUriBuilder.withHost(connectionConfig.getHost())
|
||||
.withPort(connectionConfig.getPort())
|
||||
.withDatabase(connectionConfig.getDatabase())
|
||||
.withTimeout(connectionConfig.getTimeout(), TimeUnit.MILLISECONDS);
|
||||
if (password != null) {
|
||||
redisUriBuilder.withPassword(connectionConfig.getPassword());
|
||||
}
|
||||
if (StringUtil.isNotEmpty(connectionConfig.getClientName())) {
|
||||
redisUriBuilder.withClientName(clientName);
|
||||
}
|
||||
return RedisClient.create(redisUriBuilder.build());
|
||||
}
|
||||
|
||||
private RedisClient getRedisSentinelClient(RedisConnectionConfig connectionConfig) {
|
||||
char[] password = connectionConfig.getPassword();
|
||||
String clientName = connectionConfig.getClientName();
|
||||
RedisURI.Builder sentinelRedisUriBuilder = RedisURI.builder();
|
||||
for (RedisConnectionConfig config : connectionConfig.getRedisSentinels()) {
|
||||
sentinelRedisUriBuilder.withSentinel(config.getHost(), config.getPort());
|
||||
}
|
||||
if (password != null) {
|
||||
sentinelRedisUriBuilder.withPassword(connectionConfig.getPassword());
|
||||
}
|
||||
if (StringUtil.isNotEmpty(connectionConfig.getClientName())) {
|
||||
sentinelRedisUriBuilder.withClientName(clientName);
|
||||
}
|
||||
sentinelRedisUriBuilder.withSentinelMasterId(connectionConfig.getRedisSentinelMasterId())
|
||||
.withTimeout(connectionConfig.getTimeout(), TimeUnit.MILLISECONDS);
|
||||
return RedisClient.create(sentinelRedisUriBuilder.build());
|
||||
}
|
||||
|
||||
private void subscribeFromChannel(String channel) {
|
||||
StatefulRedisPubSubConnection<String, String> pubSubConnection = redisClient.connectPubSub();
|
||||
RedisPubSubAdapter<String, String> adapterListener = new DelegatingRedisPubSubListener();
|
||||
pubSubConnection.addListener(adapterListener);
|
||||
RedisPubSubCommands<String, String> sync = pubSubConnection.sync();
|
||||
sync.subscribe(channel);
|
||||
}
|
||||
|
||||
private void loadInitialConfig() {
|
||||
try {
|
||||
T newValue = loadConfig();
|
||||
if (newValue == null) {
|
||||
RecordLog.warn("[RedisDataSource] WARN: initial config is null, you may have to check your data source");
|
||||
}
|
||||
getProperty().updateValue(newValue);
|
||||
} catch (Exception ex) {
|
||||
RecordLog.warn("[RedisDataSource] Error when loading initial config", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readSource() throws Exception {
|
||||
if (this.redisClient == null) {
|
||||
throw new IllegalStateException("redis client has not been initialized or error occurred");
|
||||
}
|
||||
RedisCommands<String, String> stringRedisCommands = redisClient.connect().sync();
|
||||
return stringRedisCommands.get(ruleKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
redisClient.shutdown();
|
||||
}
|
||||
|
||||
private class DelegatingRedisPubSubListener extends RedisPubSubAdapter<String, String> {
|
||||
|
||||
DelegatingRedisPubSubListener() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void message(String channel, String message) {
|
||||
RecordLog.info(String.format("[RedisDataSource] New property value received for channel %s: %s", channel, message));
|
||||
getProperty().updateValue(parser.convert(message));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,583 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.csp.sentinel.datasource.redis.config;
|
||||
|
||||
import com.alibaba.csp.sentinel.datasource.redis.util.AssertUtil;
|
||||
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* This class provide a builder to build redis client connection config.
|
||||
*
|
||||
* @author tiger
|
||||
*/
|
||||
public class RedisConnectionConfig {
|
||||
|
||||
/**
|
||||
* The default redisSentinel port.
|
||||
*/
|
||||
public static final int DEFAULT_SENTINEL_PORT = 26379;
|
||||
|
||||
/**
|
||||
* The default redis port.
|
||||
*/
|
||||
public static final int DEFAULT_REDIS_PORT = 6379;
|
||||
|
||||
/**
|
||||
* Default timeout: 60 sec
|
||||
*/
|
||||
public static final long DEFAULT_TIMEOUT_MILLISECONDS = 60 * 1000;
|
||||
|
||||
private String host;
|
||||
private String redisSentinelMasterId;
|
||||
private int port;
|
||||
private int database;
|
||||
private String clientName;
|
||||
private char[] password;
|
||||
private long timeout = DEFAULT_TIMEOUT_MILLISECONDS;
|
||||
private final List<RedisConnectionConfig> redisSentinels = new ArrayList<RedisConnectionConfig>();
|
||||
|
||||
/**
|
||||
* Default empty constructor.
|
||||
*/
|
||||
public RedisConnectionConfig() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor with host/port and timeout.
|
||||
*
|
||||
* @param host the host
|
||||
* @param port the port
|
||||
* @param timeout timeout value . unit is mill seconds
|
||||
*/
|
||||
public RedisConnectionConfig(String host, int port, long timeout) {
|
||||
|
||||
AssertUtil.notEmpty(host, "Host must not be empty");
|
||||
AssertUtil.notNull(timeout, "Timeout duration must not be null");
|
||||
AssertUtil.isTrue(timeout >= 0, "Timeout duration must be greater or equal to zero");
|
||||
|
||||
setHost(host);
|
||||
setPort(port);
|
||||
setTimeout(timeout);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns a new {@link RedisConnectionConfig.Builder} to construct a {@link RedisConnectionConfig}.
|
||||
*
|
||||
* @return a new {@link RedisConnectionConfig.Builder} to construct a {@link RedisConnectionConfig}.
|
||||
*/
|
||||
public static RedisConnectionConfig.Builder builder() {
|
||||
return new RedisConnectionConfig.Builder();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the host.
|
||||
*
|
||||
* @return the host.
|
||||
*/
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the Redis host.
|
||||
*
|
||||
* @param host the host
|
||||
*/
|
||||
public void setHost(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Sentinel Master Id.
|
||||
*
|
||||
* @return the Sentinel Master Id.
|
||||
*/
|
||||
public String getRedisSentinelMasterId() {
|
||||
return redisSentinelMasterId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the Sentinel Master Id.
|
||||
*
|
||||
* @param redisSentinelMasterId the Sentinel Master Id.
|
||||
*/
|
||||
public void setRedisSentinelMasterId(String redisSentinelMasterId) {
|
||||
this.redisSentinelMasterId = redisSentinelMasterId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Redis port.
|
||||
*
|
||||
* @return the Redis port
|
||||
*/
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the Redis port. Defaults to {@link #DEFAULT_REDIS_PORT}.
|
||||
*
|
||||
* @param port the Redis port
|
||||
*/
|
||||
public void setPort(int port) {
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the password.
|
||||
*
|
||||
* @return the password
|
||||
*/
|
||||
public char[] getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the password. Use empty string to skip authentication.
|
||||
*
|
||||
* @param password the password, must not be {@literal null}.
|
||||
*/
|
||||
public void setPassword(String password) {
|
||||
|
||||
AssertUtil.notNull(password, "Password must not be null");
|
||||
this.password = password.toCharArray();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the password. Use empty char array to skip authentication.
|
||||
*
|
||||
* @param password the password, must not be {@literal null}.
|
||||
*/
|
||||
public void setPassword(char[] password) {
|
||||
|
||||
AssertUtil.notNull(password, "Password must not be null");
|
||||
this.password = Arrays.copyOf(password, password.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the command timeout for synchronous command execution.
|
||||
*
|
||||
* @return the Timeout
|
||||
*/
|
||||
public long getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the command timeout for synchronous command execution.
|
||||
*
|
||||
* @param timeout the command timeout for synchronous command execution.
|
||||
*/
|
||||
public void setTimeout(Long timeout) {
|
||||
|
||||
AssertUtil.notNull(timeout, "Timeout must not be null");
|
||||
AssertUtil.isTrue(timeout >= 0, "Timeout must be greater or equal 0");
|
||||
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Redis database number. Databases are only available for Redis Standalone and Redis Master/Slave.
|
||||
*
|
||||
* @return database
|
||||
*/
|
||||
public int getDatabase() {
|
||||
return database;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the Redis database number. Databases are only available for Redis Standalone and Redis Master/Slave.
|
||||
*
|
||||
* @param database the Redis database number.
|
||||
*/
|
||||
public void setDatabase(int database) {
|
||||
|
||||
AssertUtil.isTrue(database >= 0, "Invalid database number: " + database);
|
||||
|
||||
this.database = database;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the client name.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public String getClientName() {
|
||||
return clientName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the client name to be applied on Redis connections.
|
||||
*
|
||||
* @param clientName the client name.
|
||||
*/
|
||||
public void setClientName(String clientName) {
|
||||
this.clientName = clientName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the list of {@link RedisConnectionConfig Redis Sentinel URIs}.
|
||||
*/
|
||||
public List<RedisConnectionConfig> getRedisSentinels() {
|
||||
return redisSentinels;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append(getClass().getSimpleName());
|
||||
|
||||
sb.append(" [");
|
||||
|
||||
if (host != null) {
|
||||
sb.append("host='").append(host).append('\'');
|
||||
sb.append(", port=").append(port);
|
||||
}
|
||||
if (redisSentinelMasterId != null) {
|
||||
sb.append("redisSentinels=").append(getRedisSentinels());
|
||||
sb.append(", redisSentinelMasterId=").append(redisSentinelMasterId);
|
||||
}
|
||||
|
||||
sb.append(']');
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof RedisConnectionConfig)) {
|
||||
return false;
|
||||
}
|
||||
RedisConnectionConfig redisURI = (RedisConnectionConfig) o;
|
||||
|
||||
if (port != redisURI.port) {
|
||||
return false;
|
||||
}
|
||||
if (database != redisURI.database) {
|
||||
return false;
|
||||
}
|
||||
if (host != null ? !host.equals(redisURI.host) : redisURI.host != null) {
|
||||
return false;
|
||||
}
|
||||
if (redisSentinelMasterId != null ? !redisSentinelMasterId.equals(redisURI.redisSentinelMasterId) : redisURI.redisSentinelMasterId != null) {
|
||||
return false;
|
||||
}
|
||||
return !(redisSentinels != null ? !redisSentinels.equals(redisURI.redisSentinels) : redisURI.redisSentinels != null);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = host != null ? host.hashCode() : 0;
|
||||
result = 31 * result + (redisSentinelMasterId != null ? redisSentinelMasterId.hashCode() : 0);
|
||||
result = 31 * result + port;
|
||||
result = 31 * result + database;
|
||||
result = 31 * result + (redisSentinels != null ? redisSentinels.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Builder for Redis RedisConnectionConfig.
|
||||
*/
|
||||
public static class Builder {
|
||||
|
||||
private String host;
|
||||
private String redisSentinelMasterId;
|
||||
private int port;
|
||||
private int database;
|
||||
private String clientName;
|
||||
private char[] password;
|
||||
private long timeout = DEFAULT_TIMEOUT_MILLISECONDS;
|
||||
private final List<RedisHostAndPort> redisSentinels = new ArrayList<RedisHostAndPort>();
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set Redis host. Creates a new builder.
|
||||
*
|
||||
* @param host the host name
|
||||
* @return New builder with Redis host/port.
|
||||
*/
|
||||
public static RedisConnectionConfig.Builder redis(String host) {
|
||||
return redis(host, DEFAULT_REDIS_PORT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set Redis host and port. Creates a new builder
|
||||
*
|
||||
* @param host the host name
|
||||
* @param port the port
|
||||
* @return New builder with Redis host/port.
|
||||
*/
|
||||
public static RedisConnectionConfig.Builder redis(String host, int port) {
|
||||
|
||||
AssertUtil.notEmpty(host, "Host must not be empty");
|
||||
AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port));
|
||||
|
||||
Builder builder = RedisConnectionConfig.builder();
|
||||
return builder.withHost(host).withPort(port);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set Sentinel host. Creates a new builder.
|
||||
*
|
||||
* @param host the host name
|
||||
* @return New builder with Sentinel host/port.
|
||||
*/
|
||||
public static RedisConnectionConfig.Builder redisSentinel(String host) {
|
||||
|
||||
AssertUtil.notEmpty(host, "Host must not be empty");
|
||||
|
||||
RedisConnectionConfig.Builder builder = RedisConnectionConfig.builder();
|
||||
return builder.withRedisSentinel(host);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set Sentinel host and port. Creates a new builder.
|
||||
*
|
||||
* @param host the host name
|
||||
* @param port the port
|
||||
* @return New builder with Sentinel host/port.
|
||||
*/
|
||||
public static RedisConnectionConfig.Builder redisSentinel(String host, int port) {
|
||||
|
||||
AssertUtil.notEmpty(host, "Host must not be empty");
|
||||
AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port));
|
||||
|
||||
RedisConnectionConfig.Builder builder = RedisConnectionConfig.builder();
|
||||
return builder.withRedisSentinel(host, port);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set Sentinel host and master id. Creates a new builder.
|
||||
*
|
||||
* @param host the host name
|
||||
* @param masterId redisSentinel master id
|
||||
* @return New builder with Sentinel host/port.
|
||||
*/
|
||||
public static RedisConnectionConfig.Builder redisSentinel(String host, String masterId) {
|
||||
return redisSentinel(host, DEFAULT_SENTINEL_PORT, masterId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set Sentinel host, port and master id. Creates a new builder.
|
||||
*
|
||||
* @param host the host name
|
||||
* @param port the port
|
||||
* @param masterId redisSentinel master id
|
||||
* @return New builder with Sentinel host/port.
|
||||
*/
|
||||
public static RedisConnectionConfig.Builder redisSentinel(String host, int port, String masterId) {
|
||||
|
||||
AssertUtil.notEmpty(host, "Host must not be empty");
|
||||
AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port));
|
||||
|
||||
RedisConnectionConfig.Builder builder = RedisConnectionConfig.builder();
|
||||
return builder.withSentinelMasterId(masterId).withRedisSentinel(host, port);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a withRedisSentinel host to the existing builder.
|
||||
*
|
||||
* @param host the host name
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withRedisSentinel(String host) {
|
||||
return withRedisSentinel(host, DEFAULT_SENTINEL_PORT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a withRedisSentinel host/port to the existing builder.
|
||||
*
|
||||
* @param host the host name
|
||||
* @param port the port
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withRedisSentinel(String host, int port) {
|
||||
|
||||
AssertUtil.assertState(this.host == null, "Cannot use with Redis mode.");
|
||||
AssertUtil.notEmpty(host, "Host must not be empty");
|
||||
AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port));
|
||||
|
||||
redisSentinels.add(RedisHostAndPort.of(host, port));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds host information to the builder. Does only affect Redis URI, cannot be used with Sentinel connections.
|
||||
*
|
||||
* @param host the port
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withHost(String host) {
|
||||
|
||||
AssertUtil.assertState(this.redisSentinels.isEmpty(), "Sentinels are non-empty. Cannot use in Sentinel mode.");
|
||||
AssertUtil.notEmpty(host, "Host must not be empty");
|
||||
|
||||
this.host = host;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds port information to the builder. Does only affect Redis URI, cannot be used with Sentinel connections.
|
||||
*
|
||||
* @param port the port
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withPort(int port) {
|
||||
|
||||
AssertUtil.assertState(this.host != null, "Host is null. Cannot use in Sentinel mode.");
|
||||
AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port));
|
||||
|
||||
this.port = port;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the database number.
|
||||
*
|
||||
* @param database the database number
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withDatabase(int database) {
|
||||
|
||||
AssertUtil.isTrue(database >= 0, "Invalid database number: " + database);
|
||||
|
||||
this.database = database;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures a client name.
|
||||
*
|
||||
* @param clientName the client name
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withClientName(String clientName) {
|
||||
|
||||
AssertUtil.notNull(clientName, "Client name must not be null");
|
||||
|
||||
this.clientName = clientName;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures authentication.
|
||||
*
|
||||
* @param password the password
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withPassword(String password) {
|
||||
|
||||
AssertUtil.notNull(password, "Password must not be null");
|
||||
|
||||
return withPassword(password.toCharArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures authentication.
|
||||
*
|
||||
* @param password the password
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withPassword(char[] password) {
|
||||
|
||||
AssertUtil.notNull(password, "Password must not be null");
|
||||
|
||||
this.password = Arrays.copyOf(password, password.length);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures a timeout.
|
||||
*
|
||||
* @param timeout must not be {@literal null} or negative.
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withTimeout(long timeout) {
|
||||
|
||||
AssertUtil.notNull(timeout, "Timeout must not be null");
|
||||
AssertUtil.notNull(timeout >= 0, "Timeout must be greater or equal 0");
|
||||
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures a redisSentinel master Id.
|
||||
*
|
||||
* @param sentinelMasterId redisSentinel master id, must not be empty or {@literal null}
|
||||
* @return the builder
|
||||
*/
|
||||
public RedisConnectionConfig.Builder withSentinelMasterId(String sentinelMasterId) {
|
||||
|
||||
AssertUtil.notEmpty(sentinelMasterId, "Sentinel master id must not empty");
|
||||
|
||||
this.redisSentinelMasterId = sentinelMasterId;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the RedisConnectionConfig.
|
||||
*/
|
||||
public RedisConnectionConfig build() {
|
||||
|
||||
if (redisSentinels.isEmpty() && StringUtil.isEmpty(host)) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot build a RedisConnectionConfig. One of the following must be provided Host, Socket or Sentinel");
|
||||
}
|
||||
|
||||
RedisConnectionConfig redisURI = new RedisConnectionConfig();
|
||||
redisURI.setHost(host);
|
||||
redisURI.setPort(port);
|
||||
|
||||
if (password != null) {
|
||||
redisURI.setPassword(password);
|
||||
}
|
||||
|
||||
redisURI.setDatabase(database);
|
||||
redisURI.setClientName(clientName);
|
||||
|
||||
redisURI.setRedisSentinelMasterId(redisSentinelMasterId);
|
||||
|
||||
for (RedisHostAndPort sentinel : redisSentinels) {
|
||||
redisURI.getRedisSentinels().add(new RedisConnectionConfig(sentinel.getHost(), sentinel.getPort(), timeout));
|
||||
}
|
||||
|
||||
redisURI.setTimeout(timeout);
|
||||
|
||||
return redisURI;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true for valid port numbers.
|
||||
*/
|
||||
private static boolean isValidPort(int port) {
|
||||
return port >= 0 && port <= 65535;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.csp.sentinel.datasource.redis.config;
|
||||
|
||||
import com.alibaba.csp.sentinel.datasource.redis.util.AssertUtil;
|
||||
|
||||
/**
|
||||
* An immutable representation of a host and port.
|
||||
*
|
||||
* @author tiger
|
||||
*/
|
||||
public class RedisHostAndPort {
|
||||
|
||||
private static final int NO_PORT = -1;
|
||||
|
||||
public final String host;
|
||||
public final int port;
|
||||
|
||||
/**
|
||||
* @param host must not be empty or {@literal null}.
|
||||
* @param port
|
||||
*/
|
||||
private RedisHostAndPort(String host, int port) {
|
||||
AssertUtil.notNull(host, "host must not be null");
|
||||
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link RedisHostAndPort} of {@code host} and {@code port}
|
||||
*
|
||||
* @param host the hostname
|
||||
* @param port a valid port
|
||||
* @return the {@link RedisHostAndPort} of {@code host} and {@code port}
|
||||
*/
|
||||
public static RedisHostAndPort of(String host, int port) {
|
||||
AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port));
|
||||
return new RedisHostAndPort(host, port);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@literal true} if has a port.
|
||||
*/
|
||||
public boolean hasPort() {
|
||||
return port != NO_PORT;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the host text.
|
||||
*/
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the port.
|
||||
*/
|
||||
public int getPort() {
|
||||
if (!hasPort()) {
|
||||
throw new IllegalStateException("No port present.");
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof RedisHostAndPort)) {
|
||||
return false;
|
||||
}
|
||||
RedisHostAndPort that = (RedisHostAndPort) o;
|
||||
return port == that.port && (host != null ? host.equals(that.host) : that.host == null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = host != null ? host.hashCode() : 0;
|
||||
result = 31 * result + port;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param port the port number
|
||||
* @return {@literal true} for valid port numbers.
|
||||
*/
|
||||
private static boolean isValidPort(int port) {
|
||||
return port >= 0 && port <= 65535;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(host);
|
||||
if (hasPort()) {
|
||||
sb.append(':').append(port);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.csp.sentinel.datasource.redis.util;
|
||||
|
||||
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||
|
||||
/**
|
||||
* A util class for filed check
|
||||
*
|
||||
* @author tiger
|
||||
*/
|
||||
public class AssertUtil {
|
||||
|
||||
private AssertUtil(){}
|
||||
/**
|
||||
* Assert that a string is not empty, it must not be {@code null} and it must not be empty.
|
||||
*
|
||||
* @param string the object to check
|
||||
* @param message the exception message to use if the assertion fails
|
||||
* @throws IllegalArgumentException if the object is {@code null} or the underlying string is empty
|
||||
*/
|
||||
public static void notEmpty(String string, String message) {
|
||||
if (StringUtil.isEmpty(string)) {
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that an object is not {@code null} .
|
||||
*
|
||||
* @param object the object to check
|
||||
* @param message the exception message to use if the assertion fails
|
||||
* @throws IllegalArgumentException if the object is {@code null}
|
||||
*/
|
||||
public static void notNull(Object object, String message) {
|
||||
if (object == null) {
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that {@code value} is {@literal true}.
|
||||
*
|
||||
* @param value the value to check
|
||||
* @param message the exception message to use if the assertion fails
|
||||
* @throws IllegalArgumentException if the object array contains a {@code null} element
|
||||
*/
|
||||
public static void isTrue(boolean value, String message) {
|
||||
if (!value) {
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures the truth of an expression involving the state of the calling instance, but not involving any parameters to the
|
||||
* calling method.
|
||||
*
|
||||
* @param condition a boolean expression
|
||||
* @param message the exception message to use if the assertion fails
|
||||
* @throws IllegalStateException if {@code expression} is false
|
||||
*/
|
||||
public static void assertState(boolean condition, String message) {
|
||||
if (!condition) {
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.csp.sentinel.datasource.redis;
|
||||
|
||||
import com.alibaba.csp.sentinel.datasource.redis.config.RedisConnectionConfig;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
/**
|
||||
* Test cases for {@link RedisConnectionConfig}.
|
||||
*
|
||||
* @author tiger
|
||||
*/
|
||||
public class RedisConnectionConfigTest {
|
||||
|
||||
@Test
|
||||
public void testRedisDefaultPropertySuccess() {
|
||||
String host = "localhost";
|
||||
RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redis(host).build();
|
||||
Assert.assertEquals(host, redisConnectionConfig.getHost());
|
||||
Assert.assertEquals(RedisConnectionConfig.DEFAULT_REDIS_PORT, redisConnectionConfig.getPort());
|
||||
Assert.assertEquals(RedisConnectionConfig.DEFAULT_TIMEOUT_MILLISECONDS, redisConnectionConfig.getTimeout());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedisClientNamePropertySuccess() {
|
||||
String host = "localhost";
|
||||
String clientName = "clientName";
|
||||
RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redis(host)
|
||||
.withClientName("clientName")
|
||||
.build();
|
||||
Assert.assertEquals(redisConnectionConfig.getClientName(), clientName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedisTimeOutPropertySuccess() {
|
||||
String host = "localhost";
|
||||
long timeout = 70 * 1000;
|
||||
RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redis(host)
|
||||
.withTimeout(timeout)
|
||||
.build();
|
||||
Assert.assertEquals(redisConnectionConfig.getTimeout(), timeout);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedisSentinelDefaultPortSuccess() {
|
||||
String host = "localhost";
|
||||
RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redisSentinel(host)
|
||||
.withPassword("211233")
|
||||
.build();
|
||||
Assert.assertEquals(null, redisConnectionConfig.getHost());
|
||||
Assert.assertEquals(1, redisConnectionConfig.getRedisSentinels().size());
|
||||
Assert.assertEquals(RedisConnectionConfig.DEFAULT_SENTINEL_PORT, redisConnectionConfig.getRedisSentinels().get(0).getPort());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedisSentinelMoreThanOneServerSuccess() {
|
||||
String host = "localhost";
|
||||
String host2 = "server2";
|
||||
int port2 = 1879;
|
||||
RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redisSentinel(host)
|
||||
.withRedisSentinel(host2, port2)
|
||||
.build();
|
||||
Assert.assertEquals(null, redisConnectionConfig.getHost());
|
||||
Assert.assertEquals(2, redisConnectionConfig.getRedisSentinels().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedisSentinelMoreThanOneDuplicateServerSuccess() {
|
||||
String host = "localhost";
|
||||
String host2 = "server2";
|
||||
int port2 = 1879;
|
||||
RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redisSentinel(host)
|
||||
.withRedisSentinel(host2, port2)
|
||||
.withRedisSentinel(host2, port2)
|
||||
.withPassword("211233")
|
||||
.build();
|
||||
Assert.assertEquals(null, redisConnectionConfig.getHost());
|
||||
Assert.assertEquals(3, redisConnectionConfig.getRedisSentinels().size());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.csp.sentinel.datasource.redis;
|
||||
|
||||
import com.alibaba.csp.sentinel.datasource.Converter;
|
||||
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
|
||||
import com.alibaba.csp.sentinel.datasource.redis.config.RedisConnectionConfig;
|
||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
|
||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.TypeReference;
|
||||
import io.lettuce.core.RedisClient;
|
||||
import io.lettuce.core.RedisURI;
|
||||
import io.lettuce.core.api.sync.RedisCommands;
|
||||
import org.junit.*;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* Redis redisSentinel mode test cases for {@link RedisDataSource}.
|
||||
*
|
||||
* @author tiger
|
||||
*/
|
||||
@Ignore(value = "before run this test. you should build your own redisSentinel config in local")
|
||||
public class SentinelModeRedisDataSourceTest {
|
||||
|
||||
private String host = "localhost";
|
||||
|
||||
private int redisSentinelPort = 5000;
|
||||
|
||||
private String redisSentinelMasterId = "mymaster";
|
||||
|
||||
private String ruleKey = "redis.redisSentinel.flow.rulekey";
|
||||
|
||||
private String channel = "redis.redisSentinel.flow.channel";
|
||||
|
||||
private final RedisClient client = RedisClient.create(RedisURI.Builder.sentinel(host, redisSentinelPort)
|
||||
.withSentinelMasterId(redisSentinelMasterId).build());
|
||||
|
||||
@Before
|
||||
public void initData() {
|
||||
Converter<String, List<FlowRule>> flowConfigParser = buildFlowConfigParser();
|
||||
RedisConnectionConfig config = RedisConnectionConfig.builder()
|
||||
.withRedisSentinel(host, redisSentinelPort)
|
||||
.withRedisSentinel(host, redisSentinelPort)
|
||||
.withSentinelMasterId(redisSentinelMasterId).build();
|
||||
initRedisRuleData();
|
||||
ReadableDataSource<String, List<FlowRule>> redisDataSource = new RedisDataSource<List<FlowRule>>(config, ruleKey, channel, flowConfigParser);
|
||||
FlowRuleManager.register2Property(redisDataSource.getProperty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectToSentinelAndPubMsgSuccess() {
|
||||
int maxQueueingTimeMs = new Random().nextInt();
|
||||
String flowRulesJson = "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, \"refResource\":null, " +
|
||||
"\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":" + maxQueueingTimeMs + ", \"controller\":null}]";
|
||||
RedisCommands<String, String> subCommands = client.connect().sync();
|
||||
subCommands.multi();
|
||||
subCommands.set(ruleKey, flowRulesJson);
|
||||
subCommands.publish(channel, flowRulesJson);
|
||||
subCommands.exec();
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
List<FlowRule> rules = FlowRuleManager.getRules();
|
||||
Assert.assertEquals(1, rules.size());
|
||||
rules = FlowRuleManager.getRules();
|
||||
Assert.assertEquals(rules.get(0).getMaxQueueingTimeMs(), maxQueueingTimeMs);
|
||||
String value = subCommands.get(ruleKey);
|
||||
List<FlowRule> flowRulesValuesInRedis = buildFlowConfigParser().convert(value);
|
||||
Assert.assertEquals(flowRulesValuesInRedis.size(), 1);
|
||||
Assert.assertEquals(flowRulesValuesInRedis.get(0).getMaxQueueingTimeMs(), maxQueueingTimeMs);
|
||||
}
|
||||
|
||||
@After
|
||||
public void clearResource() {
|
||||
RedisCommands<String, String> stringRedisCommands = client.connect().sync();
|
||||
stringRedisCommands.del(ruleKey);
|
||||
client.shutdown();
|
||||
}
|
||||
|
||||
private Converter<String, List<FlowRule>> buildFlowConfigParser() {
|
||||
return new Converter<String, List<FlowRule>>() {
|
||||
@Override
|
||||
public List<FlowRule> convert(String source) {
|
||||
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void initRedisRuleData() {
|
||||
String flowRulesJson = "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, \"refResource\":null, " +
|
||||
"\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]";
|
||||
RedisCommands<String, String> stringRedisCommands = client.connect().sync();
|
||||
String ok = stringRedisCommands.set(ruleKey, flowRulesJson);
|
||||
Assert.assertTrue(ok.equals("OK"));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.csp.sentinel.datasource.redis;
|
||||
|
||||
import ai.grakn.redismock.RedisServer;
|
||||
import com.alibaba.csp.sentinel.datasource.Converter;
|
||||
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
|
||||
import com.alibaba.csp.sentinel.datasource.redis.config.RedisConnectionConfig;
|
||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
|
||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.TypeReference;
|
||||
import io.lettuce.core.RedisClient;
|
||||
import io.lettuce.core.RedisURI;
|
||||
import io.lettuce.core.api.sync.RedisCommands;
|
||||
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
|
||||
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* Redis standLone mode test cases for {@link RedisDataSource}.
|
||||
*
|
||||
* @author tiger
|
||||
*/
|
||||
public class StandLoneRedisDataSourceTest {
|
||||
|
||||
private static RedisServer server = null;
|
||||
|
||||
private RedisClient client;
|
||||
|
||||
private String ruleKey = "redisSentinel.flow.rulekey";
|
||||
|
||||
private String channel = "redisSentinel.flow.channel";
|
||||
|
||||
@Before
|
||||
public void buildResource() {
|
||||
try {
|
||||
//bind to a random port
|
||||
server = RedisServer.newRedisServer();
|
||||
server.start();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Converter<String, List<FlowRule>> flowConfigParser = buildFlowConfigParser();
|
||||
client = RedisClient.create(RedisURI.create(server.getHost(), server.getBindPort()));
|
||||
RedisConnectionConfig config = RedisConnectionConfig.builder()
|
||||
.withHost(server.getHost())
|
||||
.withPort(server.getBindPort())
|
||||
.build();
|
||||
initRedisRuleData();
|
||||
ReadableDataSource<String, List<FlowRule>> redisDataSource = new RedisDataSource<List<FlowRule>>(config, ruleKey, channel, flowConfigParser);
|
||||
FlowRuleManager.register2Property(redisDataSource.getProperty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPubMsgAndReceiveSuccess() {
|
||||
List<FlowRule> rules = FlowRuleManager.getRules();
|
||||
Assert.assertEquals(1, rules.size());
|
||||
int maxQueueingTimeMs = new Random().nextInt();
|
||||
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
|
||||
String flowRules = "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, \"refResource\":null, " +
|
||||
"\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":" + maxQueueingTimeMs + ", \"controller\":null}]";
|
||||
RedisPubSubCommands<String, String> subCommands = connection.sync();
|
||||
subCommands.multi();
|
||||
subCommands.set(ruleKey, flowRules);
|
||||
subCommands.publish(channel, flowRules);
|
||||
subCommands.exec();
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
rules = FlowRuleManager.getRules();
|
||||
Assert.assertEquals(rules.get(0).getMaxQueueingTimeMs(), maxQueueingTimeMs);
|
||||
String value = subCommands.get(ruleKey);
|
||||
List<FlowRule> flowRulesValuesInRedis = buildFlowConfigParser().convert(value);
|
||||
Assert.assertEquals(flowRulesValuesInRedis.size(), 1);
|
||||
Assert.assertEquals(flowRulesValuesInRedis.get(0).getMaxQueueingTimeMs(), maxQueueingTimeMs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitAndParseFlowRuleSuccess() {
|
||||
RedisCommands<String, String> stringRedisCommands = client.connect().sync();
|
||||
String value = stringRedisCommands.get(ruleKey);
|
||||
List<FlowRule> flowRules = buildFlowConfigParser().convert(value);
|
||||
Assert.assertEquals(flowRules.size(), 1);
|
||||
stringRedisCommands.del(ruleKey);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadResourceFail() {
|
||||
RedisCommands<String, String> stringRedisCommands = client.connect().sync();
|
||||
stringRedisCommands.del(ruleKey);
|
||||
String value = stringRedisCommands.get(ruleKey);
|
||||
Assert.assertEquals(value, null);
|
||||
}
|
||||
|
||||
@After
|
||||
public void clearResource() {
|
||||
RedisCommands<String, String> stringRedisCommands = client.connect().sync();
|
||||
stringRedisCommands.del(ruleKey);
|
||||
client.shutdown();
|
||||
server.stop();
|
||||
server = null;
|
||||
}
|
||||
|
||||
private Converter<String, List<FlowRule>> buildFlowConfigParser() {
|
||||
return new Converter<String, List<FlowRule>>() {
|
||||
@Override
|
||||
public List<FlowRule> convert(String source) {
|
||||
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void initRedisRuleData() {
|
||||
String flowRulesJson = "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, \"refResource\":null, " +
|
||||
"\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]";
|
||||
RedisCommands<String, String> stringRedisCommands = client.connect().sync();
|
||||
String ok = stringRedisCommands.set(ruleKey, flowRulesJson);
|
||||
Assert.assertEquals(ok, "OK");
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue