Improve cluster state manager
- Support stop cluster mode (`NOT-STARTED` mode) - Fix bug when updating cluster state via command API (should modify via state property) Signed-off-by: Eric Zhao <sczyh16@gmail.com>
This commit is contained in:
parent
99bdb9cf3c
commit
25e96a8cd3
|
|
@ -27,7 +27,10 @@ import com.alibaba.csp.sentinel.property.SentinelProperty;
|
|||
import com.alibaba.csp.sentinel.util.TimeUtil;
|
||||
|
||||
/**
|
||||
* <p>Global tate manager for Sentinel cluster. This enables switching between cluster client and server.</p>
|
||||
* <p>
|
||||
* Global state manager for Sentinel cluster.
|
||||
* This enables switching between cluster token client and server mode.
|
||||
* </p>
|
||||
*
|
||||
* @author Eric Zhao
|
||||
* @since 1.4.0
|
||||
|
|
@ -36,15 +39,14 @@ public final class ClusterStateManager {
|
|||
|
||||
public static final int CLUSTER_CLIENT = 0;
|
||||
public static final int CLUSTER_SERVER = 1;
|
||||
public static final int CLUSTER_NOT_STARTED = -1;
|
||||
|
||||
private static volatile int mode = -1;
|
||||
private static volatile int mode = CLUSTER_NOT_STARTED;
|
||||
private static volatile long lastModified = -1;
|
||||
|
||||
private static volatile SentinelProperty<Integer> stateProperty = new DynamicSentinelProperty<Integer>();
|
||||
private static final PropertyListener<Integer> PROPERTY_LISTENER = new ClusterStatePropertyListener();
|
||||
|
||||
private static final Object UPDATE_LOCK = new Object();
|
||||
|
||||
static {
|
||||
InitExecutor.doInit();
|
||||
stateProperty.addListener(PROPERTY_LISTENER);
|
||||
|
|
@ -206,34 +208,62 @@ public final class ClusterStateManager {
|
|||
private static class ClusterStatePropertyListener implements PropertyListener<Integer> {
|
||||
@Override
|
||||
public synchronized void configLoad(Integer value) {
|
||||
applyState(value);
|
||||
applyStateInternal(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void configUpdate(Integer value) {
|
||||
applyState(value);
|
||||
applyStateInternal(value);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean applyState(Integer state) {
|
||||
if (state == null || state < 0) {
|
||||
private static boolean applyStateInternal(Integer state) {
|
||||
if (state == null || state < CLUSTER_NOT_STARTED) {
|
||||
return false;
|
||||
}
|
||||
if (state == mode) {
|
||||
return true;
|
||||
}
|
||||
synchronized (UPDATE_LOCK) {
|
||||
switch (state) {
|
||||
case CLUSTER_CLIENT:
|
||||
return setToClient();
|
||||
case CLUSTER_SERVER:
|
||||
return setToServer();
|
||||
default:
|
||||
RecordLog.warn("[ClusterStateManager] Ignoring unknown cluster state: " + state);
|
||||
return false;
|
||||
}
|
||||
switch (state) {
|
||||
case CLUSTER_CLIENT:
|
||||
return setToClient();
|
||||
case CLUSTER_SERVER:
|
||||
return setToServer();
|
||||
case CLUSTER_NOT_STARTED:
|
||||
setStop();
|
||||
return true;
|
||||
default:
|
||||
RecordLog.warn("[ClusterStateManager] Ignoring unknown cluster state: " + state);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static void setStop() {
|
||||
if (mode == CLUSTER_NOT_STARTED) {
|
||||
return;
|
||||
}
|
||||
RecordLog.info("[ClusterStateManager] Changing cluster mode to not-started");
|
||||
mode = CLUSTER_NOT_STARTED;
|
||||
|
||||
sleepIfNeeded();
|
||||
lastModified = TimeUtil.currentTimeMillis();
|
||||
|
||||
stopClient();
|
||||
stopServer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply given state to cluster mode.
|
||||
*
|
||||
* @param state valid state to apply
|
||||
*/
|
||||
public static void applyState(Integer state) {
|
||||
stateProperty.updateValue(state);
|
||||
}
|
||||
|
||||
public static void markToServer() {
|
||||
mode = CLUSTER_SERVER;
|
||||
}
|
||||
|
||||
private static final int MIN_INTERVAL = 5 * 1000;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,5 +49,9 @@ public final class TokenClientProvider {
|
|||
}
|
||||
}
|
||||
|
||||
public static boolean isClientSpiAvailable() {
|
||||
return getClient() != null;
|
||||
}
|
||||
|
||||
private TokenClientProvider() {}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,5 +44,9 @@ public final class EmbeddedClusterTokenServerProvider {
|
|||
return server;
|
||||
}
|
||||
|
||||
public static boolean isServerSpiAvailable() {
|
||||
return getServer() != null;
|
||||
}
|
||||
|
||||
private EmbeddedClusterTokenServerProvider() {}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@
|
|||
package com.alibaba.csp.sentinel.command.handler.cluster;
|
||||
|
||||
import com.alibaba.csp.sentinel.cluster.ClusterStateManager;
|
||||
import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider;
|
||||
import com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServerProvider;
|
||||
import com.alibaba.csp.sentinel.command.CommandHandler;
|
||||
import com.alibaba.csp.sentinel.command.CommandRequest;
|
||||
import com.alibaba.csp.sentinel.command.CommandResponse;
|
||||
|
|
@ -33,16 +35,24 @@ public class ModifyClusterModeCommandHandler implements CommandHandler<String> {
|
|||
public CommandResponse<String> handle(CommandRequest request) {
|
||||
try {
|
||||
int mode = Integer.valueOf(request.getParam("mode"));
|
||||
RecordLog.info("[ModifyClusterModeCommandHandler] Modifying cluster mode to: " + mode);
|
||||
if (ClusterStateManager.applyState(mode)) {
|
||||
return CommandResponse.ofSuccess("success");
|
||||
} else {
|
||||
return CommandResponse.ofSuccess("failed");
|
||||
if (mode == ClusterStateManager.CLUSTER_CLIENT && !TokenClientProvider.isClientSpiAvailable()) {
|
||||
return CommandResponse.ofFailure(new IllegalStateException("token client mode not available: no SPI found"));
|
||||
}
|
||||
if (mode == ClusterStateManager.CLUSTER_SERVER && !isClusterServerSpiAvailable()) {
|
||||
return CommandResponse.ofFailure(new IllegalStateException("token server mode not available: no SPI found"));
|
||||
}
|
||||
RecordLog.info("[ModifyClusterModeCommandHandler] Modifying cluster mode to: " + mode);
|
||||
|
||||
ClusterStateManager.applyState(mode);
|
||||
return CommandResponse.ofSuccess("success");
|
||||
} catch (NumberFormatException ex) {
|
||||
return CommandResponse.ofFailure(new IllegalArgumentException("invalid mode"));
|
||||
return CommandResponse.ofFailure(new IllegalArgumentException("invalid parameter"));
|
||||
} catch (Exception ex) {
|
||||
return CommandResponse.ofFailure(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isClusterServerSpiAvailable() {
|
||||
return EmbeddedClusterTokenServerProvider.isServerSpiAvailable();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue