Refactor extended MetricExtension interface (matching events in Sentinel)

- Unify the extended interface as a few event handlers: onPass, onBlocked, onComplete and onError
- Polish related code

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
This commit is contained in:
Eric Zhao 2020-08-18 17:33:05 +08:00 committed by Jason Joo
parent 58ff01e39b
commit a88288715a
9 changed files with 291 additions and 209 deletions

View File

@ -1,97 +1,74 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.metric.extension; package com.alibaba.csp.sentinel.metric.extension;
import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.BlockException;
/** /**
* Advanced {@link MetricExtension} extending input parameters of each metric * Extended {@link MetricExtension} extending input parameters of each metric
* collection method with the name of {@link EntryType}. * collection method with {@link EntryType}.
* *
* @author bill_yip * @author bill_yip
* @author Eric Zhao
* @since 1.8.0 * @since 1.8.0
*/ */
public interface AdvancedMetricExtension extends MetricExtension { public interface AdvancedMetricExtension extends MetricExtension {
/**
* Add current pass count of the resource name.
*
* @param n count to add
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
* @param args additional arguments of the resource, eg. if the resource is
* a method name, the args will be the parameters of the
* method.
*/
void addPass(String resource, String entryType, int n, Object... args);
/** /**
* Add current block count of the resource name. * Add current pass count of the resource name.
* *
* @param n count to add * @param rw resource representation (including resource name, traffic type, etc.)
* @param resource resource name * @param batchCount count to add
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as * @param args additional arguments of the resource, eg. if the resource is a method name,
* consumer. * the args will be the parameters of the method.
* @param origin the original invoker. */
* @param blockException block exception related. void onPass(ResourceWrapper rw, int batchCount, Object[] args);
* @param args additional arguments of the resource, eg. if the
* resource is a method name, the args will be the
* parameters of the method.
*/
void addBlock(String resource, String entryType, int n, String origin, BlockException blockException,
Object... args);
/** /**
* Add current completed count of the resource name. * Add current block count of the resource name.
* *
* @param n count to add * @param rw resource representation (including resource name, traffic type, etc.)
* @param resource resource name * @param batchCount count to add
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer. * @param origin the origin of caller (if present)
* @param args additional arguments of the resource, eg. if the resource is * @param e the associated {@code BlockException}
* a method name, the args will be the parameters of the * @param args additional arguments of the resource, eg. if the resource is a method name,
* method. * the args will be the parameters of the method.
*/ */
void addSuccess(String resource, String entryType, int n, Object... args); void onBlocked(ResourceWrapper rw, int batchCount, String origin, BlockException e,
Object[] args);
/** /**
* Add current exception count of the resource name. * Add current completed count of the resource name.
* *
* @param n count to add * @param rw resource representation (including resource name, traffic type, etc.)
* @param resource resource name * @param batchCount count to add
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer. * @param rt response time of current invocation
* @param throwable exception related. * @param args additional arguments of the resource
*/ */
void addException(String resource, String entryType, int n, Throwable throwable); void onComplete(ResourceWrapper rw, long rt, int batchCount, Object[] args);
/** /**
* Add response time of the resource name. * Add current exception count of the resource name.
* *
* @param rt response time in millisecond * @param rw resource representation (including resource name, traffic type, etc.)
* @param resource resource name * @param batchCount count to add
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer. * @param throwable exception related.
* @param args additional arguments of the resource, eg. if the resource is * @param args additional arguments of the resource
* a method name, the args will be the parameters of the */
* method. void onError(ResourceWrapper rw, Throwable throwable, int batchCount, Object[] args);
*/
void addRt(String resource, String entryTypeTag, long rt, Object... args);
/**
* Increase current thread count of the resource name.
*
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
* @param args additional arguments of the resource, eg. if the resource is
* a method name, the args will be the parameters of the
* method.
*/
void increaseThreadNum(String resource, String entryType, Object... args);
/**
* Decrease current thread count of the resource name.
*
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
* @param args additional arguments of the resource, eg. if the resource is
* a method name, the args will be the parameters of the
* method.
*/
void decreaseThreadNum(String resource, String entryType, Object... args);
} }

View File

@ -1,3 +1,18 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.metric.extension; package com.alibaba.csp.sentinel.metric.extension;
import com.alibaba.csp.sentinel.init.InitFunc; import com.alibaba.csp.sentinel.init.InitFunc;

View File

@ -1,3 +1,18 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.metric.extension; package com.alibaba.csp.sentinel.metric.extension;
import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.BlockException;

View File

@ -1,3 +1,18 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.metric.extension; package com.alibaba.csp.sentinel.metric.extension;
import java.util.ArrayList; import java.util.ArrayList;
@ -7,7 +22,7 @@ import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.SpiLoader; import com.alibaba.csp.sentinel.util.SpiLoader;
/** /**
* Get all {@link MetricExtension}s via SPI. * Get all {@link MetricExtension} via SPI.
* *
* @author Carpenter Lee * @author Carpenter Lee
* @since 1.6.1 * @since 1.6.1
@ -22,8 +37,8 @@ public class MetricExtensionProvider {
private static void resolveInstance() { private static void resolveInstance() {
List<MetricExtension> extensions = SpiLoader.loadInstanceList(MetricExtension.class); List<MetricExtension> extensions = SpiLoader.loadInstanceList(MetricExtension.class);
if (extensions == null) { if (extensions.isEmpty()) {
RecordLog.warn("[MetricExtensionProvider] WARN: No existing MetricExtension found"); RecordLog.info("[MetricExtensionProvider] No existing MetricExtension found");
} else { } else {
metricExtensions.addAll(extensions); metricExtensions.addAll(extensions);
RecordLog.info("[MetricExtensionProvider] MetricExtension resolved, size=" + extensions.size()); RecordLog.info("[MetricExtensionProvider] MetricExtension resolved, size=" + extensions.size());
@ -31,9 +46,10 @@ public class MetricExtensionProvider {
} }
/** /**
* Get all metric extensions. DO NOT MODIFY the returned list, use {@link #addMetricExtension(MetricExtension)}. * <p>Get all registered metric extensions.</p>
* <p>DO NOT MODIFY the returned list, use {@link #addMetricExtension(MetricExtension)}.</p>
* *
* @return all metric extensions. * @return all registered metric extensions
*/ */
public static List<MetricExtension> getMetricExtensions() { public static List<MetricExtension> getMetricExtensions() {
return metricExtensions; return metricExtensions;
@ -42,7 +58,7 @@ public class MetricExtensionProvider {
/** /**
* Add metric extension. * Add metric extension.
* <p> * <p>
* Not that this method is NOT thread safe. * Note that this method is NOT thread safe.
* </p> * </p>
* *
* @param metricExtension the metric extension to add. * @param metricExtension the metric extension to add.

View File

@ -1,3 +1,18 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.metric.extension.callback; package com.alibaba.csp.sentinel.metric.extension.callback;
import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.Context;
@ -16,32 +31,29 @@ import com.alibaba.csp.sentinel.slots.block.BlockException;
* @since 1.6.1 * @since 1.6.1
*/ */
public class MetricEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> { public class MetricEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> {
@Override
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, Object... args)
throws Exception {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
if (m instanceof AdvancedMetricExtension) {
((AdvancedMetricExtension) m).increaseThreadNum(resourceWrapper.getName(),
resourceWrapper.getEntryType().name(), args);
((AdvancedMetricExtension) m).addPass(resourceWrapper.getName(), resourceWrapper.getEntryType().name(),
count, args);
} else {
m.increaseThreadNum(resourceWrapper.getName(), args);
m.addPass(resourceWrapper.getName(), count, args);
}
}
}
@Override @Override
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param, public void onPass(Context context, ResourceWrapper rw, DefaultNode param, int count, Object... args)
int count, Object... args) { throws Exception {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
if (m instanceof AdvancedMetricExtension) { if (m instanceof AdvancedMetricExtension) {
((AdvancedMetricExtension) m).addBlock(resourceWrapper.getName(), resourceWrapper.getEntryType().name(), ((AdvancedMetricExtension) m).onPass(rw, count, args);
count, context.getOrigin(), ex, args); } else {
} else { m.increaseThreadNum(rw.getName(), args);
m.addBlock(resourceWrapper.getName(), count, context.getOrigin(), ex, args); m.addPass(rw.getName(), count, args);
} }
} }
} }
@Override
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param,
int count, Object... args) {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
if (m instanceof AdvancedMetricExtension) {
((AdvancedMetricExtension) m).onBlocked(resourceWrapper, count, context.getOrigin(), ex, args);
} else {
m.addBlock(resourceWrapper.getName(), count, context.getOrigin(), ex, args);
}
}
}
} }

View File

@ -1,5 +1,21 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.metric.extension.callback; package com.alibaba.csp.sentinel.metric.extension.callback;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension; import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension;
import com.alibaba.csp.sentinel.metric.extension.MetricExtension; import com.alibaba.csp.sentinel.metric.extension.MetricExtension;
@ -12,35 +28,43 @@ import com.alibaba.csp.sentinel.util.TimeUtil;
* Metric extension exit callback. * Metric extension exit callback.
* *
* @author Carpenter Lee * @author Carpenter Lee
* @author Eric Zhao
* @since 1.6.1 * @since 1.6.1
*/ */
public class MetricExitCallback implements ProcessorSlotExitCallback { public class MetricExitCallback implements ProcessorSlotExitCallback {
@Override @Override
public void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { public void onExit(Context context, ResourceWrapper rw, int acquireCount, Object... args) {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { Entry curEntry = context.getCurEntry();
if (context.getCurEntry().getBlockError() != null) { if (curEntry == null) {
continue; return;
} }
String resource = resourceWrapper.getName(); for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
String entryType = resourceWrapper.getEntryType().name(); if (curEntry.getBlockError() != null) {
Throwable ex = context.getCurEntry().getError(); continue;
long realRt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTimestamp(); }
if (m instanceof AdvancedMetricExtension) { String resource = rw.getName();
((AdvancedMetricExtension) m).addRt(resource, entryType, realRt, args); Throwable ex = curEntry.getError();
((AdvancedMetricExtension) m).addSuccess(resource, entryType, count, args); long completeTime = curEntry.getCompleteTimestamp();
((AdvancedMetricExtension) m).decreaseThreadNum(resource, entryType, args); if (completeTime <= 0) {
if (null != ex) { completeTime = TimeUtil.currentTimeMillis();
((AdvancedMetricExtension) m).addException(resource, entryType, count, ex); }
} long rt = completeTime - curEntry.getCreateTimestamp();
} else {
m.addRt(resource, realRt, args); if (m instanceof AdvancedMetricExtension) {
m.addSuccess(resource, count, args); // Since 1.8.0 (as a temporary workaround for compatibility)
m.decreaseThreadNum(resource, args); ((AdvancedMetricExtension) m).onComplete(rw, rt, acquireCount, args);
if (null != ex) { if (ex != null) {
m.addException(resource, count, ex); ((AdvancedMetricExtension) m).onError(rw, ex, acquireCount, args);
} }
} } else {
} m.addRt(resource, rt, args);
} m.addSuccess(resource, acquireCount, args);
m.decreaseThreadNum(resource, args);
if (null != ex) {
m.addException(resource, acquireCount, ex);
}
}
}
}
} }

View File

@ -1,85 +1,93 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.metric.extension.callback; package com.alibaba.csp.sentinel.metric.extension.callback;
import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension; import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.BlockException;
/**
* @author bill_yip
* @author Eric Zhao
*/
class FakeAdvancedMetricExtension implements AdvancedMetricExtension { class FakeAdvancedMetricExtension implements AdvancedMetricExtension {
long pass = 0; long pass = 0;
long block = 0; long block = 0;
long success = 0; long complete = 0;
long exception = 0; long exception = 0;
long rt = 0; long rt = 0;
long thread = 0;
@Override long concurrency = 0;
public void addPass(String resource, int n, Object... args) {
// Do nothing because of using the enhanced one
}
@Override @Override
public void addBlock(String resource, int n, String origin, BlockException blockException, Object... args) { public void onPass(ResourceWrapper rw, int batchCount, Object[] args) {
// Do nothing because of using the enhanced one this.pass += batchCount;
} this.concurrency++;
}
@Override @Override
public void addSuccess(String resource, int n, Object... args) { public void onBlocked(ResourceWrapper rw, int batchCount, String origin, BlockException e, Object[] args) {
// Do nothing because of using the enhanced one this.block += batchCount;
} }
@Override @Override
public void addException(String resource, int n, Throwable throwable) { public void onComplete(ResourceWrapper rw, long rt, int batchCount, Object[] args) {
// Do nothing because of using the enhanced one this.complete += batchCount;
} this.rt += rt;
this.concurrency--;
}
@Override @Override
public void addRt(String resource, long rt, Object... args) { public void onError(ResourceWrapper rw, Throwable throwable, int batchCount, Object[] args) {
// Do nothing because of using the enhanced one this.exception += batchCount;
} }
@Override @Override
public void increaseThreadNum(String resource, Object... args) { public void addPass(String resource, int n, Object... args) {
// Do nothing because of using the enhanced one // Do nothing because of using the enhanced one
} }
@Override @Override
public void decreaseThreadNum(String resource, Object... args) { public void addBlock(String resource, int n, String origin, BlockException blockException, Object... args) {
// Do nothing because of using the enhanced one // Do nothing because of using the enhanced one
} }
@Override @Override
public void addPass(String resource, String entryType, int n, Object... args) { public void addSuccess(String resource, int n, Object... args) {
pass += n; // Do nothing because of using the enhanced one
} }
@Override @Override
public void addBlock(String resource, String entryType, int n, String origin, BlockException blockException, public void addException(String resource, int n, Throwable throwable) {
Object... args) { // Do nothing because of using the enhanced one
block += n; }
}
@Override @Override
public void addSuccess(String resource, String entryType, int n, Object... args) { public void addRt(String resource, long rt, Object... args) {
success += n; // Do nothing because of using the enhanced one
} }
@Override @Override
public void addException(String resource, String entryType, int n, Throwable throwable) { public void increaseThreadNum(String resource, Object... args) {
exception += n; // Do nothing because of using the enhanced one
} }
@Override
public void addRt(String resource, String entryTypeTag, long rt, Object... args) {
this.rt += rt;
}
@Override
public void increaseThreadNum(String resource, String entryType, Object... args) {
thread ++;
}
@Override
public void decreaseThreadNum(String resource, String entryType, Object... args) {
thread --;
}
@Override
public void decreaseThreadNum(String resource, Object... args) {
// Do nothing because of using the enhanced one
}
} }

View File

@ -1,3 +1,18 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.metric.extension.callback; package com.alibaba.csp.sentinel.metric.extension.callback;
import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.EntryType;
@ -35,7 +50,7 @@ public class MetricEntryCallbackTest {
// assert advancedExtension // assert advancedExtension
Assert.assertEquals(advancedExtension.pass, count); Assert.assertEquals(advancedExtension.pass, count);
Assert.assertEquals(advancedExtension.thread, 1); Assert.assertEquals(advancedExtension.concurrency, 1);
} }
@Test @Test

View File

@ -77,8 +77,8 @@ public class MetricExitCallbackTest extends AbstractTimeBasedTest {
Object[] args = {"args1", "args2"}; Object[] args = {"args1", "args2"};
long prevRt = 20; long prevRt = 20;
extension.rt = prevRt; extension.rt = prevRt;
extension.success = 6; extension.complete = 6;
extension.thread = 10; extension.concurrency = 10;
Context context = mock(Context.class); Context context = mock(Context.class);
Entry entry = mock(Entry.class); Entry entry = mock(Entry.class);
@ -92,7 +92,7 @@ public class MetricExitCallbackTest extends AbstractTimeBasedTest {
when(context.getCurEntry()).thenReturn(entry); when(context.getCurEntry()).thenReturn(entry);
exitCallback.onExit(context, resourceWrapper, count, args); exitCallback.onExit(context, resourceWrapper, count, args);
Assert.assertEquals(prevRt + deltaMs, extension.rt); Assert.assertEquals(prevRt + deltaMs, extension.rt);
Assert.assertEquals(extension.success, 6 + count); Assert.assertEquals(extension.complete, 6 + count);
Assert.assertEquals(extension.thread, 10 - 1); Assert.assertEquals(extension.concurrency, 10 - 1);
} }
} }