Support multiple tokens per request entry (#380)
- Refactor MetricBucket to support add multiple count - Refactor Node and Metric related classes - Refactor for StatisticSlot Signed-off-by: Eric Zhao <sczyh16@gmail.com>
This commit is contained in:
parent
832d6e425d
commit
9165fe0d61
|
|
@ -93,10 +93,11 @@ public class ClusterNode extends StatisticNode {
|
|||
* @param count count to add
|
||||
*/
|
||||
public void trace(Throwable throwable, int count) {
|
||||
if (count <= 0) {
|
||||
return;
|
||||
}
|
||||
if (!BlockException.isBlockException(throwable)) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
this.increaseExceptionQps();
|
||||
}
|
||||
this.increaseExceptionQps(count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -107,21 +107,21 @@ public class DefaultNode extends StatisticNode {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void increaseBlockQps() {
|
||||
super.increaseBlockQps();
|
||||
this.clusterNode.increaseBlockQps();
|
||||
public void increaseBlockQps(int count) {
|
||||
super.increaseBlockQps(count);
|
||||
this.clusterNode.increaseBlockQps(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increaseExceptionQps() {
|
||||
super.increaseExceptionQps();
|
||||
this.clusterNode.increaseExceptionQps();
|
||||
public void increaseExceptionQps(int count) {
|
||||
super.increaseExceptionQps(count);
|
||||
this.clusterNode.increaseExceptionQps(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rt(long rt) {
|
||||
super.rt(rt);
|
||||
this.clusterNode.rt(rt);
|
||||
public void addRtAndSuccess(long rt, int successCount) {
|
||||
super.addRtAndSuccess(rt, successCount);
|
||||
this.clusterNode.addRtAndSuccess(rt, successCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -137,9 +137,9 @@ public class DefaultNode extends StatisticNode {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addPassRequest() {
|
||||
super.addPassRequest();
|
||||
this.clusterNode.addPassRequest();
|
||||
public void addPassRequest(int count) {
|
||||
super.addPassRequest(count);
|
||||
this.clusterNode.addPassRequest(count);
|
||||
}
|
||||
|
||||
public void printDefaultNode() {
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import com.alibaba.csp.sentinel.node.metric.MetricNode;
|
|||
* @author qinan.qn
|
||||
* @author leyou
|
||||
* @author Eric Zhao
|
||||
* @author leitao
|
||||
*/
|
||||
public interface Node {
|
||||
|
||||
|
|
@ -122,25 +121,28 @@ public interface Node {
|
|||
|
||||
/**
|
||||
* Add pass count.
|
||||
*
|
||||
* @param count count to add pass
|
||||
*/
|
||||
void addPassRequest();
|
||||
void addPassRequest(int count);
|
||||
|
||||
/**
|
||||
* Add rt and success count.
|
||||
*
|
||||
* @param rt response time
|
||||
* @param success success count to add
|
||||
*/
|
||||
void rt(long rt);
|
||||
void addRtAndSuccess(long rt, int success);
|
||||
|
||||
/**
|
||||
* Increase the block count.
|
||||
*/
|
||||
void increaseBlockQps();
|
||||
void increaseBlockQps(int count);
|
||||
|
||||
/**
|
||||
* Increase the biz exception count.
|
||||
*/
|
||||
void increaseExceptionQps();
|
||||
void increaseExceptionQps(int count);
|
||||
|
||||
/**
|
||||
* Increase current thread count.
|
||||
|
|
|
|||
|
|
@ -227,30 +227,30 @@ public class StatisticNode implements Node {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addPassRequest() {
|
||||
rollingCounterInSecond.addPass();
|
||||
rollingCounterInMinute.addPass();
|
||||
public void addPassRequest(int count) {
|
||||
rollingCounterInSecond.addPass(count);
|
||||
rollingCounterInMinute.addPass(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rt(long rt) {
|
||||
rollingCounterInSecond.addSuccess();
|
||||
public void addRtAndSuccess(long rt, int successCount) {
|
||||
rollingCounterInSecond.addSuccess(successCount);
|
||||
rollingCounterInSecond.addRT(rt);
|
||||
|
||||
rollingCounterInMinute.addSuccess();
|
||||
rollingCounterInMinute.addSuccess(successCount);
|
||||
rollingCounterInMinute.addRT(rt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increaseBlockQps() {
|
||||
rollingCounterInSecond.addBlock();
|
||||
rollingCounterInMinute.addBlock();
|
||||
public void increaseBlockQps(int count) {
|
||||
rollingCounterInSecond.addBlock(count);
|
||||
rollingCounterInMinute.addBlock(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increaseExceptionQps() {
|
||||
rollingCounterInSecond.addException();
|
||||
rollingCounterInMinute.addException();
|
||||
public void increaseExceptionQps(int count) {
|
||||
rollingCounterInSecond.addException(count);
|
||||
rollingCounterInMinute.addException(count);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -56,18 +56,18 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
|
|||
|
||||
// Request passed, add thread count and pass count.
|
||||
node.increaseThreadNum();
|
||||
node.addPassRequest();
|
||||
node.addPassRequest(count);
|
||||
|
||||
if (context.getCurEntry().getOriginNode() != null) {
|
||||
// Add count for origin node.
|
||||
context.getCurEntry().getOriginNode().increaseThreadNum();
|
||||
context.getCurEntry().getOriginNode().addPassRequest();
|
||||
context.getCurEntry().getOriginNode().addPassRequest(count);
|
||||
}
|
||||
|
||||
if (resourceWrapper.getType() == EntryType.IN) {
|
||||
// Add count for global inbound entry node for global statistics.
|
||||
Constants.ENTRY_NODE.increaseThreadNum();
|
||||
Constants.ENTRY_NODE.addPassRequest();
|
||||
Constants.ENTRY_NODE.addPassRequest(count);
|
||||
}
|
||||
|
||||
// Handle pass event with registered entry callback handlers.
|
||||
|
|
@ -79,14 +79,14 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
|
|||
context.getCurEntry().setError(e);
|
||||
|
||||
// Add block count.
|
||||
node.increaseBlockQps();
|
||||
node.increaseBlockQps(count);
|
||||
if (context.getCurEntry().getOriginNode() != null) {
|
||||
context.getCurEntry().getOriginNode().increaseBlockQps();
|
||||
context.getCurEntry().getOriginNode().increaseBlockQps(count);
|
||||
}
|
||||
|
||||
if (resourceWrapper.getType() == EntryType.IN) {
|
||||
// Add count for global inbound entry node for global statistics.
|
||||
Constants.ENTRY_NODE.increaseBlockQps();
|
||||
Constants.ENTRY_NODE.increaseBlockQps(count);
|
||||
}
|
||||
|
||||
// Handle block event with registered entry callback handlers.
|
||||
|
|
@ -100,13 +100,13 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
|
|||
context.getCurEntry().setError(e);
|
||||
|
||||
// This should not happen.
|
||||
node.increaseExceptionQps();
|
||||
node.increaseExceptionQps(count);
|
||||
if (context.getCurEntry().getOriginNode() != null) {
|
||||
context.getCurEntry().getOriginNode().increaseExceptionQps();
|
||||
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
|
||||
}
|
||||
|
||||
if (resourceWrapper.getType() == EntryType.IN) {
|
||||
Constants.ENTRY_NODE.increaseExceptionQps();
|
||||
Constants.ENTRY_NODE.increaseExceptionQps(count);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
|
@ -124,9 +124,9 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
|
|||
}
|
||||
|
||||
// Record response time and success count.
|
||||
node.rt(rt);
|
||||
node.addRtAndSuccess(rt, count);
|
||||
if (context.getCurEntry().getOriginNode() != null) {
|
||||
context.getCurEntry().getOriginNode().rt(rt);
|
||||
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
|
||||
}
|
||||
|
||||
node.decreaseThreadNum();
|
||||
|
|
@ -136,7 +136,7 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
|
|||
}
|
||||
|
||||
if (resourceWrapper.getType() == EntryType.IN) {
|
||||
Constants.ENTRY_NODE.rt(rt);
|
||||
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
|
||||
Constants.ENTRY_NODE.decreaseThreadNum();
|
||||
}
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -90,20 +90,20 @@ public class MetricBucket {
|
|||
return get(MetricEvent.SUCCESS);
|
||||
}
|
||||
|
||||
public void addPass() {
|
||||
add(MetricEvent.PASS, 1);
|
||||
public void addPass(int n) {
|
||||
add(MetricEvent.PASS, n);
|
||||
}
|
||||
|
||||
public void addException() {
|
||||
add(MetricEvent.EXCEPTION, 1);
|
||||
public void addException(int n) {
|
||||
add(MetricEvent.EXCEPTION, n);
|
||||
}
|
||||
|
||||
public void addBlock() {
|
||||
add(MetricEvent.BLOCK, 1);
|
||||
public void addBlock(int n) {
|
||||
add(MetricEvent.BLOCK, n);
|
||||
}
|
||||
|
||||
public void addSuccess() {
|
||||
add(MetricEvent.SUCCESS, 1);
|
||||
public void addSuccess(int n) {
|
||||
add(MetricEvent.SUCCESS, n);
|
||||
}
|
||||
|
||||
public void addRT(long rt) {
|
||||
|
|
|
|||
|
|
@ -162,27 +162,27 @@ public class ArrayMetric implements Metric {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addException() {
|
||||
public void addException(int count) {
|
||||
WindowWrap<MetricBucket> wrap = data.currentWindow();
|
||||
wrap.value().addException();
|
||||
wrap.value().addException(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addBlock() {
|
||||
public void addBlock(int count) {
|
||||
WindowWrap<MetricBucket> wrap = data.currentWindow();
|
||||
wrap.value().addBlock();
|
||||
wrap.value().addBlock(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSuccess() {
|
||||
public void addSuccess(int count) {
|
||||
WindowWrap<MetricBucket> wrap = data.currentWindow();
|
||||
wrap.value().addSuccess();
|
||||
wrap.value().addSuccess(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPass() {
|
||||
public void addPass(int count) {
|
||||
WindowWrap<MetricBucket> wrap = data.currentWindow();
|
||||
wrap.value().addPass();
|
||||
wrap.value().addPass(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -94,22 +94,22 @@ public interface Metric {
|
|||
/**
|
||||
* Increment by one the current exception count.
|
||||
*/
|
||||
void addException();
|
||||
void addException(int n);
|
||||
|
||||
/**
|
||||
* Increment by one the current blovk count.
|
||||
* Increment by one the current block count.
|
||||
*/
|
||||
void addBlock();
|
||||
void addBlock(int n);
|
||||
|
||||
/**
|
||||
* Increment by one the current success count.
|
||||
*/
|
||||
void addSuccess();
|
||||
void addSuccess(int n);
|
||||
|
||||
/**
|
||||
* Increment by one the current pass count.
|
||||
*/
|
||||
void addPass();
|
||||
void addPass(int n);
|
||||
|
||||
/**
|
||||
* Add given RT to current total RT.
|
||||
|
|
|
|||
|
|
@ -54,16 +54,16 @@ public class ArrayMetricTest {
|
|||
|
||||
metric.addRT(expectedRt);
|
||||
for (int i = 0; i < expectedPass; i++) {
|
||||
metric.addPass();
|
||||
metric.addPass(1);
|
||||
}
|
||||
for (int i = 0; i < expectedBlock; i++) {
|
||||
metric.addBlock();
|
||||
metric.addBlock(1);
|
||||
}
|
||||
for (int i = 0; i < expectedSuccess; i++) {
|
||||
metric.addSuccess();
|
||||
metric.addSuccess(1);
|
||||
}
|
||||
for (int i = 0; i < expectedException; i++) {
|
||||
metric.addException();
|
||||
metric.addException(1);
|
||||
}
|
||||
|
||||
assertEquals(expectedPass, metric.pass());
|
||||
|
|
|
|||
|
|
@ -79,8 +79,8 @@ public class MetricsLeapArrayTest {
|
|||
MetricBucket currentWindow = window.value();
|
||||
assertNotNull(currentWindow);
|
||||
|
||||
currentWindow.addPass();
|
||||
currentWindow.addBlock();
|
||||
currentWindow.addPass(1);
|
||||
currentWindow.addBlock(1);
|
||||
assertEquals(1L, currentWindow.pass());
|
||||
assertEquals(1L, currentWindow.block());
|
||||
|
||||
|
|
@ -90,7 +90,7 @@ public class MetricsLeapArrayTest {
|
|||
assertEquals(previousWindowStart, window.windowStart());
|
||||
|
||||
MetricBucket middleWindow = window.value();
|
||||
middleWindow.addPass();
|
||||
middleWindow.addPass(1);
|
||||
assertSame(currentWindow, middleWindow);
|
||||
assertEquals(2L, middleWindow.pass());
|
||||
assertEquals(1L, middleWindow.block());
|
||||
|
|
@ -114,7 +114,7 @@ public class MetricsLeapArrayTest {
|
|||
List<WindowWrap<MetricBucket>> firstIterWindowList = new ArrayList<WindowWrap<MetricBucket>>(len);
|
||||
for (int i = 0; i < len; i++) {
|
||||
WindowWrap<MetricBucket> w = leapArray.currentWindow(firstTime + windowLengthInMs * i);
|
||||
w.value().addPass();
|
||||
w.value().addPass(1);
|
||||
firstIterWindowList.add(i, w);
|
||||
}
|
||||
|
||||
|
|
@ -133,7 +133,7 @@ public class MetricsLeapArrayTest {
|
|||
Runnable task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
leapArray.currentWindow(time).value().addPass();
|
||||
leapArray.currentWindow(time).value().addPass(1);
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
|
@ -183,7 +183,7 @@ public class MetricsLeapArrayTest {
|
|||
Thread.sleep(windowLengthInMs + intervalInMs);
|
||||
|
||||
// This will replace the deprecated bucket, so all deprecated buckets will be reset.
|
||||
leapArray.currentWindow(time + windowLengthInMs + intervalInMs).value().addPass();
|
||||
leapArray.currentWindow(time + windowLengthInMs + intervalInMs).value().addPass(1);
|
||||
|
||||
assertEquals(1, leapArray.list().size());
|
||||
}
|
||||
|
|
@ -212,7 +212,7 @@ public class MetricsLeapArrayTest {
|
|||
|
||||
// This won't hit deprecated bucket, so no deprecated buckets will be reset.
|
||||
// But deprecated buckets can be filtered when collecting list.
|
||||
leapArray.currentWindow(TimeUtil.currentTimeMillis()).value().addPass();
|
||||
leapArray.currentWindow(TimeUtil.currentTimeMillis()).value().addPass(1);
|
||||
|
||||
assertEquals(1, leapArray.list().size());
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue