Improve RT statistic and exception tracing in Sentinel gRPC adapter (#995)

This commit is contained in:
chenledong 2019-08-16 17:40:11 +08:00 committed by Eric Zhao
parent 39293c118b
commit c8df7e7456
10 changed files with 198 additions and 406 deletions

View File

@ -3,7 +3,6 @@
Sentinel gRPC Adapter provides client and server interceptor for gRPC services. Sentinel gRPC Adapter provides client and server interceptor for gRPC services.
> Note that currently the interceptor only supports unary methods in gRPC. > Note that currently the interceptor only supports unary methods in gRPC.
> In some circumstances (e.g. asynchronous call), the RT metrics might not be accurate.
## Client Interceptor ## Client Interceptor
@ -35,4 +34,3 @@ Server server = ServerBuilder.forPort(port)
.intercept(new SentinelGrpcServerInterceptor()) // Add the server interceptor. .intercept(new SentinelGrpcServerInterceptor()) // Add the server interceptor.
.build(); .build();
``` ```

View File

@ -15,16 +15,23 @@
*/ */
package com.alibaba.csp.sentinel.adapter.grpc; package com.alibaba.csp.sentinel.adapter.grpc;
import com.alibaba.csp.sentinel.AsyncEntry; import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer; import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.BlockException;
import io.grpc.*; import io.grpc.CallOptions;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* <p>gRPC client interceptor for Sentinel. Currently it only works with unary methods.</p> * <p>gRPC client interceptor for Sentinel. Currently it only works with unary methods.</p>
@ -50,43 +57,54 @@ import javax.annotation.Nullable;
* @author Eric Zhao * @author Eric Zhao
*/ */
public class SentinelGrpcClientInterceptor implements ClientInterceptor { public class SentinelGrpcClientInterceptor implements ClientInterceptor {
private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription( private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription(
"Flow control limit exceeded (client side)"); "Flow control limit exceeded (client side)");
@Override @Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions, Channel channel) { CallOptions callOptions, Channel channel) {
String resourceName = methodDescriptor.getFullMethodName(); String fullMethodName = methodDescriptor.getFullMethodName();
AsyncEntry asyncEntry = null; Entry entry = null;
try { try {
asyncEntry = SphU.asyncEntry(resourceName, EntryType.OUT); entry = SphU.asyncEntry(fullMethodName, EntryType.OUT);
final AtomicReference<Entry> atomicReferenceEntry = new AtomicReference<>(entry);
final AsyncEntry tempEntry = asyncEntry;
// Allow access, forward the call. // Allow access, forward the call.
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>( return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) { channel.newCall(methodDescriptor, callOptions)) {
@Override @Override
public void start(Listener<RespT> responseListener, Metadata headers) { public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) { super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onReady() {
super.onReady();
}
@Override @Override
public void onClose(Status status, Metadata trailers) { public void onClose(Status status, Metadata trailers) {
super.onClose(status, trailers); Entry entry = atomicReferenceEntry.get();
// Record the exception metrics. if (entry != null) {
if (!status.isOk()) { // Record the exception metrics.
recordException(status.asRuntimeException(), tempEntry); if (!status.isOk()) {
Tracer.traceEntry(status.asRuntimeException(), entry);
}
entry.exit();
atomicReferenceEntry.set(null);
} }
tempEntry.exit(); super.onClose(status, trailers);
} }
}, headers); }, headers);
} }
/**
* Some Exceptions will only call cancel.
*/
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
Entry entry = atomicReferenceEntry.get();
// Some Exceptions will call onClose and cancel.
if (entry != null) {
// Record the exception metrics.
Tracer.traceEntry(cause, entry);
entry.exit();
atomicReferenceEntry.set(null);
}
super.cancel(message, cause);
}
}; };
} catch (BlockException e) { } catch (BlockException e) {
// Flow control threshold exceeded, block the call. // Flow control threshold exceeded, block the call.
@ -98,43 +116,27 @@ public class SentinelGrpcClientInterceptor implements ClientInterceptor {
@Override @Override
public void request(int numMessages) { public void request(int numMessages) {
} }
@Override @Override
public void cancel(@Nullable String message, @Nullable Throwable cause) { public void cancel(@Nullable String message, @Nullable Throwable cause) {
} }
@Override @Override
public void halfClose() { public void halfClose() {
} }
@Override @Override
public void sendMessage(ReqT message) { public void sendMessage(ReqT message) {
} }
}; };
} catch (RuntimeException e) { } catch (RuntimeException e) {
//catch the RuntimeException newCall throws, // Catch the RuntimeException newCall throws, entry is guaranteed to exit.
// entry is guaranteed to exit if (entry != null) {
if (asyncEntry != null) { Tracer.traceEntry(e, entry);
asyncEntry.exit(); entry.exit();
} }
throw e; throw e;
} }
} }
private void recordException(final Throwable t, AsyncEntry asyncEntry) {
ContextUtil.runOnContext(asyncEntry.getAsyncContext(), new Runnable() {
@Override
public void run() {
Tracer.trace(t);
}
});
}
} }

View File

@ -15,13 +15,21 @@
*/ */
package com.alibaba.csp.sentinel.adapter.grpc; package com.alibaba.csp.sentinel.adapter.grpc;
import com.alibaba.csp.sentinel.AsyncEntry; import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer; import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.BlockException;
import io.grpc.*; import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* <p>gRPC server interceptor for Sentinel. Currently it only works with unary methods.</p> * <p>gRPC server interceptor for Sentinel. Currently it only works with unary methods.</p>
@ -39,45 +47,50 @@ import io.grpc.*;
* @author Eric Zhao * @author Eric Zhao
*/ */
public class SentinelGrpcServerInterceptor implements ServerInterceptor { public class SentinelGrpcServerInterceptor implements ServerInterceptor {
private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription( private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription(
"Flow control limit exceeded (server side)"); "Flow control limit exceeded (server side)");
private static final StatusRuntimeException STATUS_RUNTIME_EXCEPTION = new StatusRuntimeException(Status.CANCELLED);
@Override @Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String resourceName = call.getMethodDescriptor().getFullMethodName(); String fullMethodName = call.getMethodDescriptor().getFullMethodName();
// Remote address: serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); // Remote address: serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
AsyncEntry entry = null; Entry entry = null;
try { try {
ContextUtil.enter(resourceName); entry = SphU.asyncEntry(fullMethodName, EntryType.IN);
entry = SphU.asyncEntry(resourceName, EntryType.IN); final AtomicReference<Entry> atomicReferenceEntry = new AtomicReference<>(entry);
// Allow access, forward the call. // Allow access, forward the call.
final AsyncEntry tempEntry = entry;
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>( return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall( next.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) { new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override @Override
public void close(Status status, Metadata trailers) { public void close(Status status, Metadata trailers) {
super.close(status, trailers); Entry entry = atomicReferenceEntry.get();
// Record the exception metrics. if (entry != null) {
if (!status.isOk()) { // Record the exception metrics.
recordException(status.asException(), tempEntry); if (!status.isOk()) {
Tracer.traceEntry(status.asRuntimeException(), entry);
}
//entry exit when the call be closed
entry.exit();
} }
//entry exit when the call be closed super.close(status, trailers);
tempEntry.exit();
} }
}, headers)) { }, headers)) {
/** /**
* if call was canceled, onCancel will be called. and the close will not be called * If call was canceled, onCancel will be called. and the close will not be called
* so the server is encouraged to abort processing to save resources by onCancel * so the server is encouraged to abort processing to save resources by onCancel
* @see ServerCall.Listener#onCancel() * @see ServerCall.Listener#onCancel()
*/ */
@Override @Override
public void onCancel() { public void onCancel() {
Entry entry = atomicReferenceEntry.get();
if (entry != null) {
Tracer.traceEntry(STATUS_RUNTIME_EXCEPTION, entry);
entry.exit();
atomicReferenceEntry.set(null);
}
super.onCancel(); super.onCancel();
// request has be canceled, entry should exit
tempEntry.exit();
} }
}; };
} catch (BlockException e) { } catch (BlockException e) {
@ -85,21 +98,12 @@ public class SentinelGrpcServerInterceptor implements ServerInterceptor {
return new ServerCall.Listener<ReqT>() { return new ServerCall.Listener<ReqT>() {
}; };
} catch (RuntimeException e) { } catch (RuntimeException e) {
//catch the RuntimeException startCall throws, // Catch the RuntimeException startCall throws, entry is guaranteed to exit.
// entry is guaranteed to exit
if (entry != null) { if (entry != null) {
Tracer.traceEntry(e, entry);
entry.exit(); entry.exit();
} }
throw e; throw e;
} }
} }
private void recordException(final Throwable t, AsyncEntry tempEntry) {
ContextUtil.runOnContext(tempEntry.getAsyncContext(), new Runnable() {
@Override
public void run() {
Tracer.trace(t);
}
});
}
} }

View File

@ -15,23 +15,21 @@
*/ */
package com.alibaba.csp.sentinel.adapter.grpc; package com.alibaba.csp.sentinel.adapter.grpc;
import java.util.concurrent.TimeUnit;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooServiceGrpc; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooServiceGrpc;
import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
/** /**
* A simple wrapped gRPC client for FooService. * A simple wrapped gRPC client for FooService.
* *
* @author Eric Zhao * @author Eric Zhao
*/ */
final class FooServiceClient { final class FooServiceClient {
private final ManagedChannel channel; private final ManagedChannel channel;
private final FooServiceGrpc.FooServiceBlockingStub blockingStub; private final FooServiceGrpc.FooServiceBlockingStub blockingStub;
@ -57,7 +55,6 @@ final class FooServiceClient {
return blockingStub.sayHello(request); return blockingStub.sayHello(request);
} }
FooResponse anotherHello(FooRequest request) { FooResponse anotherHello(FooRequest request) {
if (request == null) { if (request == null) {
throw new IllegalArgumentException("Request cannot be null"); throw new IllegalArgumentException("Request cannot be null");
@ -65,21 +62,6 @@ final class FooServiceClient {
return blockingStub.anotherHello(request); return blockingStub.anotherHello(request);
} }
FooResponse helloWithEx(FooRequest request) {
if (request == null) {
throw new IllegalArgumentException("Request cannot be null");
}
return blockingStub.helloWithEx(request);
}
FooResponse anotherHelloWithEx(FooRequest request) {
if (request == null) {
throw new IllegalArgumentException("Request cannot be null");
}
return blockingStub.anotherHelloWithEx(request);
}
void shutdown() throws InterruptedException { void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(1, TimeUnit.SECONDS); channel.shutdown().awaitTermination(1, TimeUnit.SECONDS);
} }

View File

@ -24,45 +24,53 @@ import io.grpc.stub.StreamObserver;
* Implementation of FooService defined in proto. * Implementation of FooService defined in proto.
*/ */
class FooServiceImpl extends FooServiceGrpc.FooServiceImplBase { class FooServiceImpl extends FooServiceGrpc.FooServiceImplBase {
@Override @Override
public void sayHello(FooRequest request, StreamObserver<FooResponse> responseObserver) { public void sayHello(FooRequest request, StreamObserver<FooResponse> responseObserver) {
int id = request.getId();
String message = String.format("Hello %s! Your ID is %d.", request.getName(), request.getId()); switch (id) {
// Exception test
FooResponse response = FooResponse.newBuilder().setMessage(message).build(); case -1:
responseObserver.onNext(response); responseObserver.onError(new IllegalAccessException("The id is error!"));
responseObserver.onCompleted(); break;
// RT test
case -2:
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
responseObserver.onError(e);
break;
}
default:
String message = String.format("Hello %s! Your ID is %d.", request.getName(), id);
FooResponse response = FooResponse.newBuilder().setMessage(message).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
break;
}
} }
@Override @Override
public void anotherHello(FooRequest request, StreamObserver<FooResponse> responseObserver) { public void anotherHello(FooRequest request, StreamObserver<FooResponse> responseObserver) {
int id = request.getId();
String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); switch (id) {
FooResponse response = FooResponse.newBuilder().setMessage(message).build(); // Exception test
responseObserver.onNext(response); case -1:
responseObserver.onCompleted(); responseObserver.onError(new IllegalAccessException("The id is error!"));
} break;
@Override // RT test
public void helloWithEx(FooRequest request, StreamObserver<FooResponse> responseObserver) { case -2:
if (request.getId() == -1) { try {
responseObserver.onError(new IllegalAccessException("The id is error")); Thread.sleep(1000);
return; } catch (InterruptedException e) {
responseObserver.onError(e);
break;
}
default:
String message = String.format("Good day, %s (%d)", request.getName(), id);
FooResponse response = FooResponse.newBuilder().setMessage(message).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
break;
} }
String message = String.format("Good day, %s (%d)", request.getName(), request.getId());
FooResponse response = FooResponse.newBuilder().setMessage(message).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void anotherHelloWithEx(FooRequest request, StreamObserver<FooResponse> responseObserver) {
if (request.getId() == -1) {
responseObserver.onError(new IllegalAccessException("The id is error"));
return;
}
String message = String.format("Good day, %s (%d)", request.getName(), request.getId());
FooResponse response = FooResponse.newBuilder().setMessage(message).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} }
} }

View File

@ -1,104 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.grpc;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.node.ClusterNode;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
import io.grpc.StatusRuntimeException;
import org.junit.After;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.*;
/**
* @author zhengzechao
*/
public class SentinelGrpcClientInterceptorDegradeTest {
private final String resourceName = "com.alibaba.sentinel.examples.FooService/helloWithEx";
private final GrpcTestServer server = new GrpcTestServer();
private final int timeWindow = 10;
private void configureDegradeRule(int count) {
DegradeRule rule = new DegradeRule()
.setCount(count)
.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT)
.setResource(resourceName)
.setLimitApp("default")
.as(DegradeRule.class)
.setTimeWindow(timeWindow);
DegradeRuleManager.loadRules(Collections.singletonList(rule));
}
private boolean sendRequest(FooServiceClient client) {
try {
FooResponse response = client.helloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(666).build());
System.out.println("Response: " + response);
return true;
} catch (StatusRuntimeException ex) {
System.out.println("Blocked, cause: " + ex.getMessage());
return false;
}
}
@Test
public void testGrpcClientInterceptor_degrade() throws IOException {
final int port = 19316;
configureDegradeRule(1);
server.start(port, false);
FooServiceClient client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor());
assertFalse(sendErrorRequest(client));
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.OUT);
assertNotNull(clusterNode);
assertEquals(1, clusterNode.exceptionQps(), 0.01);
// The second request will be blocked.
assertFalse(sendRequest(client));
assertEquals(1, clusterNode.blockRequest());
server.stop();
}
private boolean sendErrorRequest(FooServiceClient client) {
try {
FooResponse response = client.helloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(-1).build());
System.out.println("Response: " + response);
return true;
} catch (StatusRuntimeException ex) {
System.out.println("Blocked, cause: " + ex.getMessage());
return false;
}
}
@After
public void cleanUp() {
FlowRuleManager.loadRules(null);
ClusterBuilderSlot.getClusterNodeMap().clear();
}
}

View File

@ -15,8 +15,6 @@
*/ */
package com.alibaba.csp.sentinel.adapter.grpc; package com.alibaba.csp.sentinel.adapter.grpc;
import java.util.Collections;
import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse;
@ -25,12 +23,17 @@ import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** /**
* Test cases for {@link SentinelGrpcClientInterceptor}. * Test cases for {@link SentinelGrpcClientInterceptor}.
@ -38,48 +41,52 @@ import static org.junit.Assert.*;
* @author Eric Zhao * @author Eric Zhao
*/ */
public class SentinelGrpcClientInterceptorTest { public class SentinelGrpcClientInterceptorTest {
private final String fullMethodName = "com.alibaba.sentinel.examples.FooService/sayHello";
private final String resourceName = "com.alibaba.sentinel.examples.FooService/sayHello";
private final int threshold = 2;
private final GrpcTestServer server = new GrpcTestServer(); private final GrpcTestServer server = new GrpcTestServer();
private FooServiceClient client;
private void configureFlowRule(int count) { private void configureFlowRule(int count) {
FlowRule rule = new FlowRule() FlowRule rule = new FlowRule()
.setCount(count) .setCount(count)
.setGrade(RuleConstant.FLOW_GRADE_QPS) .setGrade(RuleConstant.FLOW_GRADE_QPS)
.setResource(resourceName) .setResource(fullMethodName)
.setLimitApp("default") .setLimitApp("default")
.as(FlowRule.class); .as(FlowRule.class);
FlowRuleManager.loadRules(Collections.singletonList(rule)); FlowRuleManager.loadRules(Collections.singletonList(rule));
} }
@Test @Test
public void testGrpcClientInterceptor() throws Exception { public void testGrpcClientInterceptor() throws Exception {
final int port = 19328; final int port = 19328;
configureFlowRule(threshold);
server.start(port, false); server.start(port, false);
client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor());
FooServiceClient client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor()); configureFlowRule(Integer.MAX_VALUE);
assertTrue(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(666).build()));
assertTrue(sendRequest(client)); ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(fullMethodName, EntryType.OUT);
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.OUT);
assertNotNull(clusterNode); assertNotNull(clusterNode);
assertEquals(1, clusterNode.totalRequest() - clusterNode.blockRequest()); assertEquals(1, clusterNode.totalPass());
// Not allowed to pass. // Not allowed to pass.
configureFlowRule(0); configureFlowRule(0);
// The second request will be blocked. // The second request will be blocked.
assertFalse(sendRequest(client)); assertFalse(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(666).build()));
assertEquals(1, clusterNode.blockRequest()); assertEquals(1, clusterNode.blockRequest());
configureFlowRule(Integer.MAX_VALUE);
assertFalse(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()));
assertEquals(1, clusterNode.totalException());
configureFlowRule(Integer.MAX_VALUE);
assertTrue(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(-2).build()));
assertTrue(clusterNode.avgRt() >= 1000);
server.stop(); server.stop();
} }
private boolean sendRequest(FooServiceClient client) { private boolean sendRequest(FooRequest request) {
try { try {
FooResponse response = client.sayHello(FooRequest.newBuilder().setName("Sentinel").setId(666).build()); FooResponse response = client.sayHello(request);
System.out.println("Response: " + response); System.out.println("Response: " + response);
return true; return true;
} catch (StatusRuntimeException ex) { } catch (StatusRuntimeException ex) {
@ -88,9 +95,15 @@ public class SentinelGrpcClientInterceptorTest {
} }
} }
@After @Before
public void cleanUp() { public void cleanUpBefore() {
FlowRuleManager.loadRules(null); FlowRuleManager.loadRules(null);
ClusterBuilderSlot.getClusterNodeMap().clear(); ClusterBuilderSlot.getClusterNodeMap().clear();
} }
}
@After
public void cleanUpAfter() {
FlowRuleManager.loadRules(null);
ClusterBuilderSlot.getClusterNodeMap().clear();
}
}

View File

@ -1,114 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.grpc;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.node.ClusterNode;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
import io.grpc.StatusRuntimeException;
import org.junit.After;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.*;
/**
* @author zhengzechao
*/
public class SentinelGrpcServerInterceptorDegradeTest {
private final String resourceName = "com.alibaba.sentinel.examples.FooService/anotherHelloWithEx";
private final GrpcTestServer server = new GrpcTestServer();
private final int timeWindow = 10;
private FooServiceClient client;
private void configureDegradeRule(int count) {
DegradeRule rule = new DegradeRule()
.setCount(count)
.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT)
.setResource(resourceName)
.setLimitApp("default")
.as(DegradeRule.class)
.setTimeWindow(timeWindow);
DegradeRuleManager.loadRules(Collections.singletonList(rule));
}
private boolean sendRequest() {
try {
FooResponse response = client.anotherHelloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(666)
.build());
System.out.println("Response: " + response);
return true;
} catch (StatusRuntimeException ex) {
System.out.println("Blocked, cause: " + ex.getMessage());
return false;
}
}
@Test
public void testGrpcServerInterceptor_degrade_fail_threads() throws IOException, InterruptedException {
final int port = 19349;
client = new FooServiceClient("localhost", port);
server.start(port, true);
// exception count = 1
configureDegradeRule(20);
final CountDownLatch latch = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
new Thread(new Runnable() {
@Override
public void run() {
assertFalse(sendErrorRequest());
latch.countDown();
}
}).start();
}
latch.await();
assertFalse(sendRequest());
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN);
assertEquals(20, clusterNode.totalException());
assertEquals(1, clusterNode.blockRequest());
}
private boolean sendErrorRequest() {
try {
FooResponse response = client.anotherHelloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(-1)
.build());
System.out.println("Response: " + response);
return true;
} catch (StatusRuntimeException ex) {
System.out.println("Blocked, cause: " + ex.getMessage());
return false;
}
}
@After
public void cleanUp() {
FlowRuleManager.loadRules(null);
ClusterBuilderSlot.getClusterNodeMap().clear();
}
}

View File

@ -15,8 +15,6 @@
*/ */
package com.alibaba.csp.sentinel.adapter.grpc; package com.alibaba.csp.sentinel.adapter.grpc;
import java.util.Collections;
import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse;
@ -25,12 +23,17 @@ import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** /**
* Test cases for {@link SentinelGrpcServerInterceptor}. * Test cases for {@link SentinelGrpcServerInterceptor}.
@ -38,49 +41,52 @@ import static org.junit.Assert.*;
* @author Eric Zhao * @author Eric Zhao
*/ */
public class SentinelGrpcServerInterceptorTest { public class SentinelGrpcServerInterceptorTest {
private final String resourceName = "com.alibaba.sentinel.examples.FooService/anotherHello"; private final String resourceName = "com.alibaba.sentinel.examples.FooService/anotherHello";
private final int threshold = 4;
private final GrpcTestServer server = new GrpcTestServer(); private final GrpcTestServer server = new GrpcTestServer();
private FooServiceClient client; private FooServiceClient client;
private void configureFlowRule(int count) { private void configureFlowRule(int count) {
FlowRule rule = new FlowRule() FlowRule rule = new FlowRule()
.setCount(count) .setCount(count)
.setGrade(RuleConstant.FLOW_GRADE_QPS) .setGrade(RuleConstant.FLOW_GRADE_QPS)
.setResource(resourceName) .setResource(resourceName)
.setLimitApp("default") .setLimitApp("default")
.as(FlowRule.class); .as(FlowRule.class);
FlowRuleManager.loadRules(Collections.singletonList(rule)); FlowRuleManager.loadRules(Collections.singletonList(rule));
} }
@Test @Test
public void testGrpcServerInterceptor() throws Exception { public void testGrpcServerInterceptor() throws Exception {
final int port = 19329; final int port = 19329;
server.start(port, true);
client = new FooServiceClient("localhost", port); client = new FooServiceClient("localhost", port);
configureFlowRule(threshold); configureFlowRule(Integer.MAX_VALUE);
server.start(port, true); assertTrue(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(666).build()));
assertTrue(sendRequest());
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN);
assertNotNull(clusterNode); assertNotNull(clusterNode);
assertEquals(1, clusterNode.totalRequest() - clusterNode.blockRequest()); assertEquals(1, clusterNode.totalPass());
// Not allowed to pass. // Not allowed to pass.
configureFlowRule(0); configureFlowRule(0);
// The second request will be blocked. // The second request will be blocked.
assertFalse(sendRequest()); assertFalse(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(666).build()));
assertEquals(1, clusterNode.blockRequest()); assertEquals(1, clusterNode.blockRequest());
configureFlowRule(Integer.MAX_VALUE);
assertFalse(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()));
assertEquals(1, clusterNode.totalException());
configureFlowRule(Integer.MAX_VALUE);
assertTrue(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(-2).build()));
assertTrue(clusterNode.avgRt() >= 1000);
server.stop(); server.stop();
} }
private boolean sendRequest() { private boolean sendRequest(FooRequest request) {
try { try {
FooResponse response = client.anotherHello(FooRequest.newBuilder().setName("Sentinel").setId(666).build()); FooResponse response = client.anotherHello(request);
System.out.println("Response: " + response); System.out.println("Response: " + response);
return true; return true;
} catch (StatusRuntimeException ex) { } catch (StatusRuntimeException ex) {
@ -89,10 +95,15 @@ public class SentinelGrpcServerInterceptorTest {
} }
} }
@Before
@After public void cleanUpBefore() {
public void cleanUp() {
FlowRuleManager.loadRules(null); FlowRuleManager.loadRules(null);
ClusterBuilderSlot.getClusterNodeMap().clear(); ClusterBuilderSlot.getClusterNodeMap().clear();
} }
}
@After
public void cleanUpAfter() {
FlowRuleManager.loadRules(null);
ClusterBuilderSlot.getClusterNodeMap().clear();
}
}

View File

@ -17,14 +17,6 @@ message FooResponse {
// Example service definition. // Example service definition.
service FooService { service FooService {
rpc sayHello(FooRequest) returns (FooResponse) {} rpc sayHello(FooRequest) returns (FooResponse) {}
rpc anotherHello(FooRequest) returns (FooResponse) {} rpc anotherHello(FooRequest) returns (FooResponse) {}
}
rpc helloWithEx(FooRequest) returns (FooResponse) {}
rpc anotherHelloWithEx(FooRequest) returns (FooResponse) {}
}