Polish cluster client module
- Initial work Signed-off-by: Eric Zhao <sczyh16@gmail.com>
This commit is contained in:
parent
9a69104f79
commit
138c265a34
|
|
@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.cluster.client;
|
|||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.4.0
|
||||
*/
|
||||
public final class ClientConstants {
|
||||
|
||||
|
|
|
|||
|
|
@ -16,9 +16,9 @@
|
|||
package com.alibaba.csp.sentinel.cluster.client;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants;
|
||||
import com.alibaba.csp.sentinel.cluster.ClusterTokenClient;
|
||||
import com.alibaba.csp.sentinel.cluster.ClusterTransportClient;
|
||||
import com.alibaba.csp.sentinel.cluster.TokenResult;
|
||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
|
||||
|
|
@ -26,7 +26,7 @@ import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor;
|
|||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig;
|
||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager;
|
||||
import com.alibaba.csp.sentinel.cluster.client.config.ServerChangeObserver;
|
||||
import com.alibaba.csp.sentinel.cluster.log.ClusterStatLogUtil;
|
||||
import com.alibaba.csp.sentinel.cluster.log.ClusterClientStatLogUtil;
|
||||
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
|
||||
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData;
|
||||
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;
|
||||
|
|
@ -46,6 +46,8 @@ public class DefaultClusterTokenClient implements ClusterTokenClient {
|
|||
private ClusterTransportClient transportClient;
|
||||
private TokenServerDescriptor serverDescriptor;
|
||||
|
||||
private final AtomicBoolean shouldStart = new AtomicBoolean(false);
|
||||
|
||||
public DefaultClusterTokenClient() {
|
||||
ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() {
|
||||
@Override
|
||||
|
|
@ -53,14 +55,10 @@ public class DefaultClusterTokenClient implements ClusterTokenClient {
|
|||
changeServer(clusterClientConfig);
|
||||
}
|
||||
});
|
||||
// TODO: check here, who should start the client?
|
||||
initNewConnection();
|
||||
}
|
||||
|
||||
public DefaultClusterTokenClient(ClusterTransportClient transportClient) {
|
||||
// TODO: only for test, remove this constructor.
|
||||
this.transportClient = transportClient;
|
||||
}
|
||||
|
||||
private boolean serverEqual(TokenServerDescriptor descriptor, ClusterClientConfig config) {
|
||||
if (descriptor == null || config == null) {
|
||||
return false;
|
||||
|
|
@ -81,9 +79,9 @@ public class DefaultClusterTokenClient implements ClusterTokenClient {
|
|||
try {
|
||||
this.transportClient = new NettyTransportClient(host, port);
|
||||
this.serverDescriptor = new TokenServerDescriptor(host, port);
|
||||
transportClient.start();
|
||||
RecordLog.info("[DefaultClusterTokenClient] New client created: " + serverDescriptor);
|
||||
} catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
RecordLog.warn("[DefaultClusterTokenClient] Failed to initialize new token client", ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -93,17 +91,46 @@ public class DefaultClusterTokenClient implements ClusterTokenClient {
|
|||
}
|
||||
try {
|
||||
// TODO: what if the client is pending init?
|
||||
if (transportClient != null && transportClient.isReady()) {
|
||||
if (transportClient != null) {
|
||||
transportClient.stop();
|
||||
}
|
||||
// Replace with new, even if the new client is not ready.
|
||||
this.transportClient = new NettyTransportClient(config);
|
||||
this.serverDescriptor = new TokenServerDescriptor(config.getServerHost(), config.getServerPort());
|
||||
transportClient.start();
|
||||
startClientIfScheduled();
|
||||
RecordLog.info("[DefaultClusterTokenClient] New client created: " + serverDescriptor);
|
||||
} catch (Exception ex) {
|
||||
RecordLog.warn("[DefaultClusterTokenClient] Failed to change remote token server", ex);
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private void startClientIfScheduled() throws Exception {
|
||||
if (shouldStart.get()) {
|
||||
if (transportClient != null) {
|
||||
transportClient.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void stopClientIfStarted() throws Exception {
|
||||
if (shouldStart.get()) {
|
||||
if (transportClient != null) {
|
||||
transportClient.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
if (shouldStart.compareAndSet(false, true)) {
|
||||
startClientIfScheduled();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
if (shouldStart.compareAndSet(true, false)) {
|
||||
stopClientIfStarted();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -123,7 +150,7 @@ public class DefaultClusterTokenClient implements ClusterTokenClient {
|
|||
try {
|
||||
return sendTokenRequest(request);
|
||||
} catch (Exception ex) {
|
||||
ClusterStatLogUtil.log(ex.getMessage());
|
||||
ClusterClientStatLogUtil.log(ex.getMessage());
|
||||
return new TokenResult(TokenResultStatus.FAIL);
|
||||
}
|
||||
}
|
||||
|
|
@ -139,7 +166,7 @@ public class DefaultClusterTokenClient implements ClusterTokenClient {
|
|||
try {
|
||||
return sendTokenRequest(request);
|
||||
} catch (Exception ex) {
|
||||
ClusterStatLogUtil.log(ex.getMessage());
|
||||
ClusterClientStatLogUtil.log(ex.getMessage());
|
||||
return new TokenResult(TokenResultStatus.FAIL);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
package com.alibaba.csp.sentinel.cluster.client;
|
||||
|
||||
import java.util.AbstractMap.SimpleEntry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
|
||||
|
|
@ -56,6 +57,8 @@ import io.netty.util.concurrent.GenericFutureListener;
|
|||
*/
|
||||
public class NettyTransportClient implements ClusterTransportClient {
|
||||
|
||||
public static final int RECONNECT_DELAY_MS = 1000;
|
||||
|
||||
private final String host;
|
||||
private final int port;
|
||||
|
||||
|
|
@ -63,8 +66,9 @@ public class NettyTransportClient implements ClusterTransportClient {
|
|||
private NioEventLoopGroup eventLoopGroup;
|
||||
private TokenClientHandler clientHandler;
|
||||
|
||||
private AtomicInteger idGenerator = new AtomicInteger(0);
|
||||
private AtomicInteger failConnectedTime = new AtomicInteger(0);
|
||||
private final AtomicInteger idGenerator = new AtomicInteger(0);
|
||||
private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);
|
||||
private final AtomicInteger failConnectedTime = new AtomicInteger(0);
|
||||
|
||||
public NettyTransportClient(ClusterClientConfig clientConfig) {
|
||||
AssertUtil.notNull(clientConfig, "client config cannot be null");
|
||||
|
|
@ -91,7 +95,7 @@ public class NettyTransportClient implements ClusterTransportClient {
|
|||
.handler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel ch) throws Exception {
|
||||
clientHandler = new TokenClientHandler();
|
||||
clientHandler = new TokenClientHandler(currentState, disconnectCallback);
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
|
||||
pipeline.addLast(new NettyResponseDecoder());
|
||||
|
|
@ -105,24 +109,44 @@ public class NettyTransportClient implements ClusterTransportClient {
|
|||
}
|
||||
|
||||
private void connect(Bootstrap b) {
|
||||
b.connect(host, port).addListener(new GenericFutureListener<ChannelFuture>() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
if (future.cause() != null) {
|
||||
RecordLog.warn(
|
||||
"[NettyTransportClient] Could not connect after " + failConnectedTime.get() + " times",
|
||||
future.cause());
|
||||
failConnectedTime.incrementAndGet();
|
||||
channel = null;
|
||||
} else {
|
||||
failConnectedTime.set(0);
|
||||
channel = future.channel();
|
||||
RecordLog.info("[NettyTransportClient] Successfully connect to server " + host + ":" + port);
|
||||
if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
|
||||
b.connect(host, port).addListener(new GenericFutureListener<ChannelFuture>() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
if (future.cause() != null) {
|
||||
RecordLog.warn(String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times",
|
||||
host, port, failConnectedTime.get()), future.cause());
|
||||
failConnectedTime.incrementAndGet();
|
||||
channel = null;
|
||||
} else {
|
||||
failConnectedTime.set(0);
|
||||
channel = future.channel();
|
||||
RecordLog.info("[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private Runnable disconnectCallback = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (channel != null) {
|
||||
channel.eventLoop().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
RecordLog.info("[NettyTransportClient] Reconnecting to server <" + host + ":" + port + ">");
|
||||
try {
|
||||
start();
|
||||
} catch (Exception e) {
|
||||
RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e);
|
||||
}
|
||||
}
|
||||
}, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
connect(initClientBootstrap());
|
||||
|
|
@ -130,6 +154,14 @@ public class NettyTransportClient implements ClusterTransportClient {
|
|||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (Exception ex) {
|
||||
// Ignore.
|
||||
}
|
||||
}
|
||||
|
||||
if (channel != null) {
|
||||
channel.close();
|
||||
channel = null;
|
||||
|
|
@ -139,7 +171,7 @@ public class NettyTransportClient implements ClusterTransportClient {
|
|||
}
|
||||
failConnectedTime.set(0);
|
||||
|
||||
RecordLog.info("[NettyTransportClient] Token client stopped");
|
||||
RecordLog.info("[NettyTransportClient] Cluster transport client stopped");
|
||||
}
|
||||
|
||||
private boolean validRequest(Request request) {
|
||||
|
|
|
|||
|
|
@ -36,7 +36,6 @@ public class DefaultRequestEntityWriter implements RequestEntityWriter<ClusterRe
|
|||
EntityWriter<Object, ByteBuf> requestDataWriter = RequestDataWriterRegistry.getWriter(type);
|
||||
|
||||
if (requestDataWriter == null) {
|
||||
// TODO: may need to throw exception?
|
||||
RecordLog.warn("[DefaultRequestEntityWriter] Cannot find matching request writer for type <{0}>,"
|
||||
+ " dropping the request", type);
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -55,7 +55,6 @@ public class DefaultResponseEntityDecoder implements ResponseEntityDecoder<ByteB
|
|||
if (source.readableBytes() == 0) {
|
||||
data = null;
|
||||
} else {
|
||||
// TODO: handle decode error here.
|
||||
data = decoder.decode(source);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import io.netty.buffer.ByteBuf;
|
|||
|
||||
/**
|
||||
* +-------------------+--------------+----------------+---------------+------------------+
|
||||
* | RequestID(4 byte) | Type(1 byte) | FlowID(4 byte) | Count(4 byte) | PriorityFlag (1) |
|
||||
* | RequestID(8 byte) | Type(1 byte) | FlowID(4 byte) | Count(4 byte) | PriorityFlag (1) |
|
||||
* +-------------------+--------------+----------------+---------------+------------------+
|
||||
*
|
||||
* @author Eric Zhao
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;
|
|||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
/**
|
||||
* @author jialiang.linjl
|
||||
* @author Eric Zhao
|
||||
* @since 1.4.0
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.cluster.client.codec.data;
|
||||
|
||||
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
|
||||
import com.alibaba.csp.sentinel.util.StringUtil;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.4.0
|
||||
*/
|
||||
public class PingRequestDataWriter implements EntityWriter<String, ByteBuf> {
|
||||
|
||||
@Override
|
||||
public void writeTo(String entity, ByteBuf target) {
|
||||
if (StringUtil.isBlank(entity) || target == null) {
|
||||
return;
|
||||
}
|
||||
byte[] bytes = entity.getBytes();
|
||||
target.writeInt(bytes.length);
|
||||
target.writeBytes(bytes);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.cluster.client.codec.data;
|
||||
|
||||
import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.4.0
|
||||
*/
|
||||
public class PingResponseDataDecoder implements EntityDecoder<ByteBuf, Integer> {
|
||||
|
||||
@Override
|
||||
public Integer decode(ByteBuf source) {
|
||||
if (source.readableBytes() >= 1) {
|
||||
return (int) source.readByte();
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
@ -35,7 +35,6 @@ public class NettyRequestEncoder extends MessageToByteEncoder<ClusterRequest> {
|
|||
protected void encode(ChannelHandlerContext ctx, ClusterRequest request, ByteBuf out) throws Exception {
|
||||
RequestEntityWriter<Request, ByteBuf> requestEntityWriter = ClientEntityCodecProvider.getRequestEntityWriter();
|
||||
if (requestEntityWriter == null) {
|
||||
// TODO: may need to throw exception?
|
||||
RecordLog.warn("[NettyRequestEncoder] Cannot resolve the global request entity writer, dropping the request");
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,7 +39,6 @@ public class NettyResponseDecoder extends ByteToMessageDecoder {
|
|||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
ResponseEntityDecoder<ByteBuf, Response> responseDecoder = ClientEntityCodecProvider.getResponseEntityDecoder();
|
||||
if (responseDecoder == null) {
|
||||
// TODO: may need to throw exception?
|
||||
RecordLog.warn("[NettyResponseDecoder] Cannot resolve the global response entity decoder, "
|
||||
+ "dropping the response");
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
|
|||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.4.0
|
||||
*/
|
||||
public final class RequestDataWriterRegistry {
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,10 @@ package com.alibaba.csp.sentinel.cluster.client.handler;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants;
|
||||
import com.alibaba.csp.sentinel.cluster.client.ClientConstants;
|
||||
import com.alibaba.csp.sentinel.cluster.registry.ConfigSupplierRegistry;
|
||||
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
|
||||
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
|
||||
import com.alibaba.csp.sentinel.log.RecordLog;
|
||||
|
||||
|
|
@ -25,32 +28,63 @@ import io.netty.channel.ChannelHandlerContext;
|
|||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
|
||||
/**
|
||||
* Netty client handler for Sentinel token client.
|
||||
*
|
||||
* @author Eric Zhao
|
||||
* @since 1.4.0
|
||||
*/
|
||||
public class TokenClientHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);
|
||||
private final AtomicInteger currentState;
|
||||
private final Runnable disconnectCallback;
|
||||
|
||||
public TokenClientHandler(AtomicInteger currentState, Runnable disconnectCallback) {
|
||||
this.currentState = currentState;
|
||||
this.disconnectCallback = disconnectCallback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
currentState.set(ClientConstants.CLIENT_STATUS_STARTED);
|
||||
currentState.compareAndSet(ClientConstants.CLIENT_STATUS_PENDING, ClientConstants.CLIENT_STATUS_STARTED);
|
||||
fireClientPing(ctx);
|
||||
RecordLog.info("[TokenClientHandler] Client handler active, remote address: " + ctx.channel().remoteAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
System.out.println(String.format("[%s] Client message recv: %s", System.currentTimeMillis(), msg));
|
||||
System.out.println(String.format("[%s] Client message recv: %s", System.currentTimeMillis(), msg)); // TODO: remove here
|
||||
if (msg instanceof ClusterResponse) {
|
||||
ClusterResponse<?> response = (ClusterResponse) msg;
|
||||
|
||||
if (response.getType() == ClusterConstants.MSG_TYPE_PING) {
|
||||
handlePingResponse(ctx, response);
|
||||
return;
|
||||
}
|
||||
|
||||
TokenClientPromiseHolder.completePromise(response.getId(), response);
|
||||
}
|
||||
}
|
||||
|
||||
private void fireClientPing(ChannelHandlerContext ctx) {
|
||||
// Data body: namespace of the client.
|
||||
ClusterRequest<String> ping = new ClusterRequest<String>().setId(0)
|
||||
.setType(ClusterConstants.MSG_TYPE_PING)
|
||||
.setData(ConfigSupplierRegistry.getNamespaceSupplier().get());
|
||||
ctx.writeAndFlush(ping);
|
||||
}
|
||||
|
||||
private void handlePingResponse(ChannelHandlerContext ctx, ClusterResponse response) {
|
||||
if (response.getStatus() == ClusterConstants.RESPONSE_STATUS_OK) {
|
||||
int count = (int) response.getData();
|
||||
RecordLog.info("[TokenClientHandler] Client ping OK (target server: {0}, connected count: {1})",
|
||||
ctx.channel().remoteAddress(), count);
|
||||
return;
|
||||
}
|
||||
RecordLog.warn("[TokenClientHandler] Client ping failed (target server: {0})", ctx.channel().remoteAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
// TODO: should close the connection when an exception is raised.
|
||||
RecordLog.warn("[TokenClientHandler] Client exception caught", cause);
|
||||
}
|
||||
|
||||
|
|
@ -61,7 +95,9 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
@Override
|
||||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||
currentState.set(ClientConstants.CLIENT_STATUS_OFF);
|
||||
RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + ctx.channel().remoteAddress());
|
||||
currentState.compareAndSet(ClientConstants.CLIENT_STATUS_STARTED, ClientConstants.CLIENT_STATUS_OFF);
|
||||
disconnectCallback.run();
|
||||
}
|
||||
|
||||
public int getCurrentState() {
|
||||
|
|
|
|||
|
|
@ -47,13 +47,17 @@ public final class TokenClientPromiseHolder {
|
|||
if (!PROMISE_MAP.containsKey(xid)) {
|
||||
return false;
|
||||
}
|
||||
ChannelPromise promise = PROMISE_MAP.get(xid).getKey();
|
||||
if (promise.isDone() || promise.isCancelled()) {
|
||||
return false;
|
||||
SimpleEntry<ChannelPromise, ClusterResponse> entry = PROMISE_MAP.get(xid);
|
||||
if (entry != null) {
|
||||
ChannelPromise promise = entry.getKey();
|
||||
if (promise.isDone() || promise.isCancelled()) {
|
||||
return false;
|
||||
}
|
||||
entry.setValue(response);
|
||||
promise.setSuccess();
|
||||
return true;
|
||||
}
|
||||
PROMISE_MAP.get(xid).setValue(response);
|
||||
promise.setSuccess();
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private TokenClientPromiseHolder() {}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.csp.sentinel.cluster.client.init;
|
||||
|
||||
import com.alibaba.csp.sentinel.cluster.client.ClientConstants;
|
||||
import com.alibaba.csp.sentinel.cluster.client.codec.data.FlowRequestDataWriter;
|
||||
import com.alibaba.csp.sentinel.cluster.client.codec.data.FlowResponseDataDecoder;
|
||||
import com.alibaba.csp.sentinel.cluster.client.codec.data.ParamFlowRequestDataWriter;
|
||||
import com.alibaba.csp.sentinel.cluster.client.codec.data.PingRequestDataWriter;
|
||||
import com.alibaba.csp.sentinel.cluster.client.codec.data.PingResponseDataDecoder;
|
||||
import com.alibaba.csp.sentinel.cluster.client.codec.registry.RequestDataWriterRegistry;
|
||||
import com.alibaba.csp.sentinel.cluster.client.codec.registry.ResponseDataDecodeRegistry;
|
||||
import com.alibaba.csp.sentinel.init.InitFunc;
|
||||
import com.alibaba.csp.sentinel.init.InitOrder;
|
||||
|
||||
/**
|
||||
* @author Eric Zhao
|
||||
* @since 1.4.0
|
||||
*/
|
||||
@InitOrder(0)
|
||||
public class DefaultClusterClientInitFunc implements InitFunc {
|
||||
|
||||
@Override
|
||||
public void init() throws Exception {
|
||||
initDefaultEntityWriters();
|
||||
initDefaultEntityDecoders();
|
||||
}
|
||||
|
||||
private void initDefaultEntityWriters() {
|
||||
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter());
|
||||
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter());
|
||||
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter());
|
||||
}
|
||||
|
||||
private void initDefaultEntityDecoders() {
|
||||
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PING, new PingResponseDataDecoder());
|
||||
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_FLOW, new FlowResponseDataDecoder());
|
||||
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PARAM_FLOW, new FlowResponseDataDecoder());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
com.alibaba.csp.sentinel.cluster.client.init.DefaultClusterClientInitFunc
|
||||
Loading…
Reference in New Issue