Add extended interface for metric extension hook to support distinguishing traffic type (#1665)
- Add EntryType args to all hook methods
This commit is contained in:
parent
dae4621e6e
commit
8643dac943
|
|
@ -0,0 +1,97 @@
|
||||||
|
package com.alibaba.csp.sentinel.metric.extension;
|
||||||
|
|
||||||
|
import com.alibaba.csp.sentinel.EntryType;
|
||||||
|
import com.alibaba.csp.sentinel.slots.block.BlockException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Advanced {@link MetricExtension} extending input parameters of each metric
|
||||||
|
* collection method with the name of {@link EntryType}.
|
||||||
|
*
|
||||||
|
* @author bill_yip
|
||||||
|
* @since 1.8.0
|
||||||
|
*/
|
||||||
|
public interface AdvancedMetricExtension extends MetricExtension {
|
||||||
|
/**
|
||||||
|
* Add current pass count of the resource name.
|
||||||
|
*
|
||||||
|
* @param n count to add
|
||||||
|
* @param resource resource name
|
||||||
|
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
|
||||||
|
* @param args additional arguments of the resource, eg. if the resource is
|
||||||
|
* a method name, the args will be the parameters of the
|
||||||
|
* method.
|
||||||
|
*/
|
||||||
|
void addPass(String resource, String entryType, int n, Object... args);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add current block count of the resource name.
|
||||||
|
*
|
||||||
|
* @param n count to add
|
||||||
|
* @param resource resource name
|
||||||
|
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as
|
||||||
|
* consumer.
|
||||||
|
* @param origin the original invoker.
|
||||||
|
* @param blockException block exception related.
|
||||||
|
* @param args additional arguments of the resource, eg. if the
|
||||||
|
* resource is a method name, the args will be the
|
||||||
|
* parameters of the method.
|
||||||
|
*/
|
||||||
|
void addBlock(String resource, String entryType, int n, String origin, BlockException blockException,
|
||||||
|
Object... args);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add current completed count of the resource name.
|
||||||
|
*
|
||||||
|
* @param n count to add
|
||||||
|
* @param resource resource name
|
||||||
|
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
|
||||||
|
* @param args additional arguments of the resource, eg. if the resource is
|
||||||
|
* a method name, the args will be the parameters of the
|
||||||
|
* method.
|
||||||
|
*/
|
||||||
|
void addSuccess(String resource, String entryType, int n, Object... args);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add current exception count of the resource name.
|
||||||
|
*
|
||||||
|
* @param n count to add
|
||||||
|
* @param resource resource name
|
||||||
|
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
|
||||||
|
* @param throwable exception related.
|
||||||
|
*/
|
||||||
|
void addException(String resource, String entryType, int n, Throwable throwable);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add response time of the resource name.
|
||||||
|
*
|
||||||
|
* @param rt response time in millisecond
|
||||||
|
* @param resource resource name
|
||||||
|
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
|
||||||
|
* @param args additional arguments of the resource, eg. if the resource is
|
||||||
|
* a method name, the args will be the parameters of the
|
||||||
|
* method.
|
||||||
|
*/
|
||||||
|
void addRt(String resource, String entryTypeTag, long rt, Object... args);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increase current thread count of the resource name.
|
||||||
|
*
|
||||||
|
* @param resource resource name
|
||||||
|
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
|
||||||
|
* @param args additional arguments of the resource, eg. if the resource is
|
||||||
|
* a method name, the args will be the parameters of the
|
||||||
|
* method.
|
||||||
|
*/
|
||||||
|
void increaseThreadNum(String resource, String entryType, Object... args);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrease current thread count of the resource name.
|
||||||
|
*
|
||||||
|
* @param resource resource name
|
||||||
|
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
|
||||||
|
* @param args additional arguments of the resource, eg. if the resource is
|
||||||
|
* a method name, the args will be the parameters of the
|
||||||
|
* method.
|
||||||
|
*/
|
||||||
|
void decreaseThreadNum(String resource, String entryType, Object... args);
|
||||||
|
}
|
||||||
|
|
@ -1,8 +1,9 @@
|
||||||
package com.alibaba.csp.sentinel.metric.extension.callback;
|
package com.alibaba.csp.sentinel.metric.extension.callback;
|
||||||
|
|
||||||
import com.alibaba.csp.sentinel.context.Context;
|
import com.alibaba.csp.sentinel.context.Context;
|
||||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider;
|
import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension;
|
||||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtension;
|
import com.alibaba.csp.sentinel.metric.extension.MetricExtension;
|
||||||
|
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider;
|
||||||
import com.alibaba.csp.sentinel.node.DefaultNode;
|
import com.alibaba.csp.sentinel.node.DefaultNode;
|
||||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback;
|
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback;
|
||||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
|
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
|
||||||
|
|
@ -16,19 +17,31 @@ import com.alibaba.csp.sentinel.slots.block.BlockException;
|
||||||
*/
|
*/
|
||||||
public class MetricEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> {
|
public class MetricEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> {
|
||||||
@Override
|
@Override
|
||||||
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode param,
|
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, Object... args)
|
||||||
int count, Object... args) throws Exception {
|
throws Exception {
|
||||||
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
|
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
|
||||||
|
if (m instanceof AdvancedMetricExtension) {
|
||||||
|
((AdvancedMetricExtension) m).increaseThreadNum(resourceWrapper.getName(),
|
||||||
|
resourceWrapper.getEntryType().name(), args);
|
||||||
|
((AdvancedMetricExtension) m).addPass(resourceWrapper.getName(), resourceWrapper.getEntryType().name(),
|
||||||
|
count, args);
|
||||||
|
} else {
|
||||||
m.increaseThreadNum(resourceWrapper.getName(), args);
|
m.increaseThreadNum(resourceWrapper.getName(), args);
|
||||||
m.addPass(resourceWrapper.getName(), count, args);
|
m.addPass(resourceWrapper.getName(), count, args);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper,
|
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param,
|
||||||
DefaultNode param, int count, Object... args) {
|
int count, Object... args) {
|
||||||
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
|
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
|
||||||
|
if (m instanceof AdvancedMetricExtension) {
|
||||||
|
((AdvancedMetricExtension) m).addBlock(resourceWrapper.getName(), resourceWrapper.getEntryType().name(),
|
||||||
|
count, context.getOrigin(), ex, args);
|
||||||
|
} else {
|
||||||
m.addBlock(resourceWrapper.getName(), count, context.getOrigin(), ex, args);
|
m.addBlock(resourceWrapper.getName(), count, context.getOrigin(), ex, args);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,9 @@
|
||||||
package com.alibaba.csp.sentinel.metric.extension.callback;
|
package com.alibaba.csp.sentinel.metric.extension.callback;
|
||||||
|
|
||||||
import com.alibaba.csp.sentinel.context.Context;
|
import com.alibaba.csp.sentinel.context.Context;
|
||||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider;
|
import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension;
|
||||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtension;
|
import com.alibaba.csp.sentinel.metric.extension.MetricExtension;
|
||||||
|
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider;
|
||||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback;
|
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback;
|
||||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
|
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
|
||||||
import com.alibaba.csp.sentinel.util.TimeUtil;
|
import com.alibaba.csp.sentinel.util.TimeUtil;
|
||||||
|
|
@ -22,15 +23,24 @@ public class MetricExitCallback implements ProcessorSlotExitCallback {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
String resource = resourceWrapper.getName();
|
String resource = resourceWrapper.getName();
|
||||||
|
String entryType = resourceWrapper.getEntryType().name();
|
||||||
|
Throwable ex = context.getCurEntry().getError();
|
||||||
long realRt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTimestamp();
|
long realRt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTimestamp();
|
||||||
|
if (m instanceof AdvancedMetricExtension) {
|
||||||
|
((AdvancedMetricExtension) m).addRt(resource, entryType, realRt, args);
|
||||||
|
((AdvancedMetricExtension) m).addSuccess(resource, entryType, count, args);
|
||||||
|
((AdvancedMetricExtension) m).decreaseThreadNum(resource, entryType, args);
|
||||||
|
if (null != ex) {
|
||||||
|
((AdvancedMetricExtension) m).addException(resource, entryType, count, ex);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
m.addRt(resource, realRt, args);
|
m.addRt(resource, realRt, args);
|
||||||
m.addSuccess(resource, count, args);
|
m.addSuccess(resource, count, args);
|
||||||
m.decreaseThreadNum(resource, args);
|
m.decreaseThreadNum(resource, args);
|
||||||
|
if (null != ex) {
|
||||||
Throwable ex = context.getCurEntry().getError();
|
|
||||||
if (ex != null) {
|
|
||||||
m.addException(resource, count, ex);
|
m.addException(resource, count, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,85 @@
|
||||||
|
package com.alibaba.csp.sentinel.metric.extension.callback;
|
||||||
|
|
||||||
|
import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension;
|
||||||
|
import com.alibaba.csp.sentinel.slots.block.BlockException;
|
||||||
|
|
||||||
|
class FakeAdvancedMetricExtension implements AdvancedMetricExtension {
|
||||||
|
long pass = 0;
|
||||||
|
long block = 0;
|
||||||
|
long success = 0;
|
||||||
|
long exception = 0;
|
||||||
|
long rt = 0;
|
||||||
|
long thread = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addPass(String resource, int n, Object... args) {
|
||||||
|
// Do nothing because of using the enhanced one
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addBlock(String resource, int n, String origin, BlockException blockException, Object... args) {
|
||||||
|
// Do nothing because of using the enhanced one
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addSuccess(String resource, int n, Object... args) {
|
||||||
|
// Do nothing because of using the enhanced one
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addException(String resource, int n, Throwable throwable) {
|
||||||
|
// Do nothing because of using the enhanced one
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addRt(String resource, long rt, Object... args) {
|
||||||
|
// Do nothing because of using the enhanced one
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void increaseThreadNum(String resource, Object... args) {
|
||||||
|
// Do nothing because of using the enhanced one
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void decreaseThreadNum(String resource, Object... args) {
|
||||||
|
// Do nothing because of using the enhanced one
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addPass(String resource, String entryType, int n, Object... args) {
|
||||||
|
pass += n;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addBlock(String resource, String entryType, int n, String origin, BlockException blockException,
|
||||||
|
Object... args) {
|
||||||
|
block += n;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addSuccess(String resource, String entryType, int n, Object... args) {
|
||||||
|
success += n;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addException(String resource, String entryType, int n, Throwable throwable) {
|
||||||
|
exception += n;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addRt(String resource, String entryTypeTag, long rt, Object... args) {
|
||||||
|
this.rt += rt;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void increaseThreadNum(String resource, String entryType, Object... args) {
|
||||||
|
thread ++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void decreaseThreadNum(String resource, String entryType, Object... args) {
|
||||||
|
thread --;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -20,21 +20,30 @@ public class MetricEntryCallbackTest {
|
||||||
@Test
|
@Test
|
||||||
public void onPass() throws Exception {
|
public void onPass() throws Exception {
|
||||||
FakeMetricExtension extension = new FakeMetricExtension();
|
FakeMetricExtension extension = new FakeMetricExtension();
|
||||||
|
FakeAdvancedMetricExtension advancedExtension = new FakeAdvancedMetricExtension();
|
||||||
MetricExtensionProvider.addMetricExtension(extension);
|
MetricExtensionProvider.addMetricExtension(extension);
|
||||||
|
MetricExtensionProvider.addMetricExtension(advancedExtension);
|
||||||
|
|
||||||
MetricEntryCallback entryCallback = new MetricEntryCallback();
|
MetricEntryCallback entryCallback = new MetricEntryCallback();
|
||||||
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT);
|
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT);
|
||||||
int count = 2;
|
int count = 2;
|
||||||
Object[] args = {"args1", "args2"};
|
Object[] args = {"args1", "args2"};
|
||||||
entryCallback.onPass(null, resourceWrapper, null, count, args);
|
entryCallback.onPass(null, resourceWrapper, null, count, args);
|
||||||
|
// assert extension
|
||||||
Assert.assertEquals(extension.pass, count);
|
Assert.assertEquals(extension.pass, count);
|
||||||
Assert.assertEquals(extension.thread, 1);
|
Assert.assertEquals(extension.thread, 1);
|
||||||
|
|
||||||
|
// assert advancedExtension
|
||||||
|
Assert.assertEquals(advancedExtension.pass, count);
|
||||||
|
Assert.assertEquals(advancedExtension.thread, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void onBlocked() throws Exception {
|
public void onBlocked() throws Exception {
|
||||||
FakeMetricExtension extension = new FakeMetricExtension();
|
FakeMetricExtension extension = new FakeMetricExtension();
|
||||||
|
FakeAdvancedMetricExtension advancedExtension = new FakeAdvancedMetricExtension();
|
||||||
MetricExtensionProvider.addMetricExtension(extension);
|
MetricExtensionProvider.addMetricExtension(extension);
|
||||||
|
MetricExtensionProvider.addMetricExtension(advancedExtension);
|
||||||
|
|
||||||
MetricEntryCallback entryCallback = new MetricEntryCallback();
|
MetricEntryCallback entryCallback = new MetricEntryCallback();
|
||||||
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT);
|
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT);
|
||||||
|
|
@ -43,6 +52,9 @@ public class MetricEntryCallbackTest {
|
||||||
int count = 2;
|
int count = 2;
|
||||||
Object[] args = {"args1", "args2"};
|
Object[] args = {"args1", "args2"};
|
||||||
entryCallback.onBlocked(new FlowException("xx"), context, resourceWrapper, null, count, args);
|
entryCallback.onBlocked(new FlowException("xx"), context, resourceWrapper, null, count, args);
|
||||||
|
// assert extension
|
||||||
Assert.assertEquals(extension.block, count);
|
Assert.assertEquals(extension.block, count);
|
||||||
|
// assert advancedExtension
|
||||||
|
Assert.assertEquals(advancedExtension.block, count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -62,4 +62,37 @@ public class MetricExitCallbackTest extends AbstractTimeBasedTest {
|
||||||
Assert.assertEquals(extension.success, 6 + count);
|
Assert.assertEquals(extension.success, 6 + count);
|
||||||
Assert.assertEquals(extension.thread, 10 - 1);
|
Assert.assertEquals(extension.thread, 10 - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author bill_yip
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void advancedExtensionOnExit() {
|
||||||
|
FakeAdvancedMetricExtension extension = new FakeAdvancedMetricExtension();
|
||||||
|
MetricExtensionProvider.addMetricExtension(extension);
|
||||||
|
|
||||||
|
MetricExitCallback exitCallback = new MetricExitCallback();
|
||||||
|
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT);
|
||||||
|
int count = 2;
|
||||||
|
Object[] args = {"args1", "args2"};
|
||||||
|
long prevRt = 20;
|
||||||
|
extension.rt = prevRt;
|
||||||
|
extension.success = 6;
|
||||||
|
extension.thread = 10;
|
||||||
|
Context context = mock(Context.class);
|
||||||
|
Entry entry = mock(Entry.class);
|
||||||
|
|
||||||
|
// Mock current time
|
||||||
|
long curMillis = System.currentTimeMillis();
|
||||||
|
setCurrentMillis(curMillis);
|
||||||
|
|
||||||
|
int deltaMs = 100;
|
||||||
|
when(entry.getError()).thenReturn(null);
|
||||||
|
when(entry.getCreateTimestamp()).thenReturn(curMillis - deltaMs);
|
||||||
|
when(context.getCurEntry()).thenReturn(entry);
|
||||||
|
exitCallback.onExit(context, resourceWrapper, count, args);
|
||||||
|
Assert.assertEquals(prevRt + deltaMs, extension.rt);
|
||||||
|
Assert.assertEquals(extension.success, 6 + count);
|
||||||
|
Assert.assertEquals(extension.thread, 10 - 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue