S =
+ AtomicReferenceFieldUpdater.newUpdater(InheritableBaseSubscriber.class, Subscription.class,
+ "subscription");
+
+ /**
+ * Return current {@link Subscription}
+ *
+ * @return current {@link Subscription}
+ */
+ protected Subscription upstream() {
+ return subscription;
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return subscription == Operators.cancelledSubscription();
+ }
+
+ /**
+ * {@link Disposable#dispose() Dispose} the {@link Subscription} by
+ * {@link Subscription#cancel() cancelling} it.
+ */
+ @Override
+ public void dispose() {
+ cancel();
+ }
+
+ /**
+ * Hook for further processing of onSubscribe's Subscription. Implement this method
+ * to call {@link #request(long)} as an initial request. Values other than the
+ * unbounded {@code Long.MAX_VALUE} imply that you'll also call request in
+ * {@link #hookOnNext(Object)}.
+ * Defaults to request unbounded Long.MAX_VALUE as in {@link #requestUnbounded()}
+ *
+ * @param subscription the subscription to optionally process
+ */
+ protected void hookOnSubscribe(Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ /**
+ * Hook for processing of onNext values. You can call {@link #request(long)} here
+ * to further request data from the source {@code org.reactivestreams.Publisher} if
+ * the {@link #hookOnSubscribe(Subscription) initial request} wasn't unbounded.
+ *
Defaults to doing nothing.
+ *
+ * @param value the emitted value to process
+ */
+ protected void hookOnNext(T value) {
+ // NO-OP
+ }
+
+ /**
+ * Optional hook for completion processing. Defaults to doing nothing.
+ */
+ protected void hookOnComplete() {
+ // NO-OP
+ }
+
+ /**
+ * Optional hook for error processing. Default is to call
+ * {@link Exceptions#errorCallbackNotImplemented(Throwable)}.
+ *
+ * @param throwable the error to process
+ */
+ protected void hookOnError(Throwable throwable) {
+ throw Exceptions.errorCallbackNotImplemented(throwable);
+ }
+
+ /**
+ * Optional hook executed when the subscription is cancelled by calling this
+ * Subscriber's {@link #cancel()} method. Defaults to doing nothing.
+ */
+ protected void hookOnCancel() {
+ //NO-OP
+ }
+
+ /**
+ * Optional hook executed after any of the termination events (onError, onComplete,
+ * cancel). The hook is executed in addition to and after {@link #hookOnError(Throwable)},
+ * {@link #hookOnComplete()} and {@link #hookOnCancel()} hooks, even if these callbacks
+ * fail. Defaults to doing nothing. A failure of the callback will be caught by
+ * {@code Operators#onErrorDropped(Throwable, reactor.util.context.Context)}.
+ *
+ * @param type the type of termination event that triggered the hook
+ * ({@link SignalType#ON_ERROR}, {@link SignalType#ON_COMPLETE} or
+ * {@link SignalType#CANCEL})
+ */
+ protected void hookFinally(SignalType type) {
+ //NO-OP
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ if (Operators.setOnce(S, this, s)) {
+ try {
+ hookOnSubscribe(s);
+ } catch (Throwable throwable) {
+ onError(Operators.onOperatorError(s, throwable, currentContext()));
+ }
+ }
+ }
+
+ @Override
+ public void onNext(T value) {
+ Objects.requireNonNull(value, "onNext");
+ try {
+ hookOnNext(value);
+ } catch (Throwable throwable) {
+ onError(Operators.onOperatorError(subscription, throwable, value, currentContext()));
+ }
+ }
+
+ protected boolean shouldCallErrorDropHook() {
+ return true;
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ Objects.requireNonNull(t, "onError");
+
+ if (S.getAndSet(this, Operators.cancelledSubscription()) == Operators
+ .cancelledSubscription()) {
+ // Already cancelled concurrently
+
+ // Workaround for Sentinel BlockException:
+ // Here we add a predicate method to decide whether exception should be dropped implicitly
+ // or call the {@code onErrorDropped} hook.
+ if (shouldCallErrorDropHook()) {
+ Operators.onErrorDropped(t, currentContext());
+ }
+
+ return;
+ }
+
+ try {
+ hookOnError(t);
+ } catch (Throwable e) {
+ e = Exceptions.addSuppressed(e, t);
+ Operators.onErrorDropped(e, currentContext());
+ } finally {
+ safeHookFinally(SignalType.ON_ERROR);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ if (S.getAndSet(this, Operators.cancelledSubscription()) != Operators
+ .cancelledSubscription()) {
+ //we're sure it has not been concurrently cancelled
+ try {
+ hookOnComplete();
+ } catch (Throwable throwable) {
+ //onError itself will short-circuit due to the CancelledSubscription being push above
+ hookOnError(Operators.onOperatorError(throwable, currentContext()));
+ } finally {
+ safeHookFinally(SignalType.ON_COMPLETE);
+ }
+ }
+ }
+
+ @Override
+ public final void request(long n) {
+ if (Operators.validate(n)) {
+ Subscription s = this.subscription;
+ if (s != null) {
+ s.request(n);
+ }
+ }
+ }
+
+ /**
+ * {@link #request(long) Request} an unbounded amount.
+ */
+ public final void requestUnbounded() {
+ request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public final void cancel() {
+ if (Operators.terminate(S, this)) {
+ try {
+ hookOnCancel();
+ } catch (Throwable throwable) {
+ hookOnError(Operators.onOperatorError(subscription, throwable, currentContext()));
+ } finally {
+ safeHookFinally(SignalType.CANCEL);
+ }
+ }
+ }
+
+ void safeHookFinally(SignalType type) {
+ try {
+ hookFinally(type);
+ } catch (Throwable finallyFailure) {
+ Operators.onErrorDropped(finallyFailure, currentContext());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName();
+ }
+}
diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java
new file mode 100644
index 00000000..7ee353c2
--- /dev/null
+++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.adapter.reactor;
+
+import reactor.core.CoreSubscriber;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoOperator;
+
+/**
+ * @author Eric Zhao
+ * @since 1.5.0
+ */
+public class MonoSentinelOperator extends MonoOperator {
+
+ private final EntryConfig entryConfig;
+
+ public MonoSentinelOperator(Mono extends T> source, EntryConfig entryConfig) {
+ super(source);
+ EntryConfig.assertValid(entryConfig);
+ this.entryConfig = entryConfig;
+ }
+
+ @Override
+ public void subscribe(CoreSubscriber super T> actual) {
+ source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, true));
+ }
+}
diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphU.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphU.java
new file mode 100644
index 00000000..b872f0a8
--- /dev/null
+++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphU.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.adapter.reactor;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.alibaba.csp.sentinel.AsyncEntry;
+import com.alibaba.csp.sentinel.EntryType;
+import com.alibaba.csp.sentinel.SphU;
+import com.alibaba.csp.sentinel.Tracer;
+import com.alibaba.csp.sentinel.context.Context;
+import com.alibaba.csp.sentinel.slots.block.BlockException;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * A {@link SphU} adapter with Project Reactor.
+ *
+ * @author Eric Zhao
+ * @since 1.5.0
+ */
+public final class ReactorSphU {
+
+ public static Mono entryWith(String resourceName, Mono actual) {
+ return entryWith(resourceName, EntryType.OUT, actual);
+ }
+
+ public static Mono entryWith(String resourceName, EntryType entryType, Mono actual) {
+ final AtomicReference entryWrapper = new AtomicReference<>(null);
+ return Mono.defer(() -> {
+ try {
+ AsyncEntry entry = SphU.asyncEntry(resourceName, entryType);
+ entryWrapper.set(entry);
+ return actual.subscriberContext(context -> {
+ if (entry == null) {
+ return context;
+ }
+ Context sentinelContext = entry.getAsyncContext();
+ if (sentinelContext == null) {
+ return context;
+ }
+ // TODO: check GC friendly?
+ return context.put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, sentinelContext);
+ }).doOnSuccessOrError((o, t) -> {
+ if (entry != null && entryWrapper.compareAndSet(entry, null)) {
+ if (t != null) {
+ Tracer.traceContext(t, 1, entry.getAsyncContext());
+ }
+ entry.exit();
+ }
+ });
+ } catch (BlockException ex) {
+ return Mono.error(ex);
+ }
+ });
+ }
+
+ private ReactorSphU() {}
+}
diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorConstants.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorConstants.java
new file mode 100644
index 00000000..9819c7e0
--- /dev/null
+++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.adapter.reactor;
+
+/**
+ * @author Eric Zhao
+ * @since 1.5.0
+ */
+public final class SentinelReactorConstants {
+
+ public static final String SENTINEL_CONTEXT_KEY = "_sentinel_context";
+
+ private SentinelReactorConstants() {}
+}
diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java
new file mode 100644
index 00000000..d95c1f33
--- /dev/null
+++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.adapter.reactor;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.alibaba.csp.sentinel.AsyncEntry;
+import com.alibaba.csp.sentinel.SphU;
+import com.alibaba.csp.sentinel.Tracer;
+import com.alibaba.csp.sentinel.context.ContextUtil;
+import com.alibaba.csp.sentinel.slots.block.BlockException;
+import com.alibaba.csp.sentinel.util.function.Supplier;
+
+import org.reactivestreams.Subscription;
+import reactor.core.CoreSubscriber;
+import reactor.util.context.Context;
+
+/**
+ * @author Eric Zhao
+ * @since 1.5.0
+ */
+public class SentinelReactorSubscriber extends InheritableBaseSubscriber {
+
+ private final EntryConfig entryConfig;
+
+ private final CoreSubscriber super T> actual;
+ private final boolean unary;
+
+ private volatile AsyncEntry currentEntry;
+ private final AtomicBoolean entryExited = new AtomicBoolean(false);
+
+ public SentinelReactorSubscriber(EntryConfig entryConfig,
+ CoreSubscriber super T> actual,
+ boolean unary) {
+ checkEntryConfig(entryConfig);
+ this.entryConfig = entryConfig;
+ this.actual = actual;
+ this.unary = unary;
+ }
+
+ private void checkEntryConfig(EntryConfig config) {
+ EntryConfig.assertValid(config);
+ }
+
+ @Override
+ public Context currentContext() {
+ if (currentEntry == null || entryExited.get()) {
+ return actual.currentContext();
+ }
+ com.alibaba.csp.sentinel.context.Context sentinelContext = currentEntry.getAsyncContext();
+ if (sentinelContext == null) {
+ return actual.currentContext();
+ }
+ return actual.currentContext()
+ .put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, currentEntry.getAsyncContext());
+ }
+
+ private void doWithContextOrCurrent(Supplier> contextSupplier,
+ Runnable f) {
+ Optional contextOpt = contextSupplier.get();
+ if (!contextOpt.isPresent()) {
+ // Provided context is absent, use current context.
+ f.run();
+ } else {
+ // Run on provided context.
+ ContextUtil.runOnContext(contextOpt.get(), f);
+ }
+ }
+
+ private void entryWhenSubscribed() {
+ ContextConfig sentinelContextConfig = entryConfig.getContextConfig();
+ if (sentinelContextConfig != null) {
+ // If current we're already in a context, the context config won't work.
+ ContextUtil.enter(sentinelContextConfig.getContextName(), sentinelContextConfig.getOrigin());
+ }
+ try {
+ AsyncEntry entry = SphU.asyncEntry(entryConfig.getResourceName());
+ this.currentEntry = entry;
+ actual.onSubscribe(this);
+ } catch (BlockException ex) {
+ // Mark as completed (exited) explicitly.
+ entryExited.set(true);
+ // Signal cancel and propagate the {@code BlockException}.
+ cancel();
+ actual.onSubscribe(this);
+ actual.onError(ex);
+ } finally {
+ if (sentinelContextConfig != null) {
+ ContextUtil.exit();
+ }
+ }
+ }
+
+ @Override
+ protected void hookOnSubscribe(Subscription subscription) {
+ doWithContextOrCurrent(() -> currentContext().getOrEmpty(SentinelReactorConstants.SENTINEL_CONTEXT_KEY),
+ this::entryWhenSubscribed);
+ }
+
+ @Override
+ protected void hookOnNext(T value) {
+ if (isDisposed()) {
+ tryCompleteEntry();
+ return;
+ }
+ doWithContextOrCurrent(() -> Optional.ofNullable(currentEntry).map(AsyncEntry::getAsyncContext),
+ () -> actual.onNext(value));
+
+ if (unary) {
+ // For some cases of unary operator (Mono), we have to do this during onNext hook.
+ // e.g. this kind of order: onSubscribe() -> onNext() -> cancel() -> onComplete()
+ // the onComplete hook will not be executed so we'll need to complete the entry in advance.
+ tryCompleteEntry();
+ }
+ }
+
+ @Override
+ protected void hookOnComplete() {
+ tryCompleteEntry();
+ actual.onComplete();
+ }
+
+ @Override
+ protected boolean shouldCallErrorDropHook() {
+ // When flow control triggered or stream terminated, the incoming
+ // deprecated exceptions should be dropped implicitly, so we'll not call the `onErrorDropped` hook.
+ return !entryExited.get();
+ }
+
+ @Override
+ protected void hookOnError(Throwable t) {
+ if (currentEntry != null && currentEntry.getAsyncContext() != null) {
+ // Normal requests with non-BlockException will go through here.
+ Tracer.traceContext(t, 1, currentEntry.getAsyncContext());
+ }
+ tryCompleteEntry();
+ actual.onError(t);
+ }
+
+ @Override
+ protected void hookOnCancel() {
+
+ }
+
+ private boolean tryCompleteEntry() {
+ if (currentEntry != null && entryExited.compareAndSet(false, true)) {
+ currentEntry.exit();
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java
new file mode 100644
index 00000000..0b251dca
--- /dev/null
+++ b/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.adapter.reactor;
+
+import java.util.function.Function;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * A transformer that transforms given {@code Publisher} to a wrapped Sentinel reactor operator.
+ *
+ * @author Eric Zhao
+ * @since 1.5.0
+ */
+public class SentinelReactorTransformer implements Function, Publisher> {
+
+ private final EntryConfig entryConfig;
+
+ public SentinelReactorTransformer(String resourceName) {
+ this(new EntryConfig(resourceName));
+ }
+
+ public SentinelReactorTransformer(EntryConfig entryConfig) {
+ EntryConfig.assertValid(entryConfig);
+ this.entryConfig = entryConfig;
+ }
+
+ @Override
+ public Publisher apply(Publisher publisher) {
+ if (publisher instanceof Mono) {
+ return new MonoSentinelOperator<>((Mono) publisher, entryConfig);
+ }
+ if (publisher instanceof Flux) {
+ return new FluxSentinelOperator<>((Flux) publisher, entryConfig);
+ }
+
+ throw new IllegalStateException("Publisher type is not supported: " + publisher.getClass().getCanonicalName());
+ }
+}
\ No newline at end of file
diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperatorTestIntegrationTest.java b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperatorTestIntegrationTest.java
new file mode 100644
index 00000000..7b144760
--- /dev/null
+++ b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperatorTestIntegrationTest.java
@@ -0,0 +1,75 @@
+package com.alibaba.csp.sentinel.adapter.reactor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import com.alibaba.csp.sentinel.node.ClusterNode;
+import com.alibaba.csp.sentinel.slots.block.BlockException;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
+import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
+
+import org.junit.Test;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Eric Zhao
+ */
+public class FluxSentinelOperatorTestIntegrationTest {
+
+ @Test
+ public void testEmitMultipleValueSuccess() {
+ String resourceName = createResourceName("testEmitMultipleSuccess");
+ StepVerifier.create(Flux.just(1, 2)
+ .map(e -> e * 2)
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .expectNext(2)
+ .expectNext(4)
+ .verifyComplete();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(1, cn.passQps(), 0.01);
+ }
+
+ @Test
+ public void testEmitFluxError() {
+ String resourceName = createResourceName("testEmitFluxError");
+ StepVerifier.create(Flux.error(new IllegalAccessException("oops"))
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .expectError(IllegalAccessException.class)
+ .verify();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(1, cn.passQps());
+ assertEquals(1, cn.totalException());
+ }
+
+ @Test
+ public void testEmitMultipleValuesWhenFlowControlTriggered() {
+ String resourceName = createResourceName("testEmitMultipleValuesWhenFlowControlTriggered");
+ FlowRuleManager.loadRules(Collections.singletonList(
+ new FlowRule(resourceName).setCount(0)
+ ));
+ StepVerifier.create(Flux.just(1, 3, 5)
+ .map(e -> e * 2)
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .expectError(BlockException.class)
+ .verify();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(0, cn.passQps(), 0.01);
+ assertEquals(1, cn.blockRequest());
+
+ FlowRuleManager.loadRules(new ArrayList<>());
+ }
+
+ private String createResourceName(String resourceName) {
+ return "reactor_test_flux_" + resourceName;
+ }
+}
\ No newline at end of file
diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperatorIntegrationTest.java b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperatorIntegrationTest.java
new file mode 100644
index 00000000..e682723a
--- /dev/null
+++ b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperatorIntegrationTest.java
@@ -0,0 +1,168 @@
+package com.alibaba.csp.sentinel.adapter.reactor;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import com.alibaba.csp.sentinel.Constants;
+import com.alibaba.csp.sentinel.EntryType;
+import com.alibaba.csp.sentinel.node.ClusterNode;
+import com.alibaba.csp.sentinel.node.EntranceNode;
+import com.alibaba.csp.sentinel.slots.block.BlockException;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
+import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
+
+import org.junit.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Eric Zhao
+ */
+public class MonoSentinelOperatorIntegrationTest {
+
+ @Test
+ public void testTransformMonoWithSentinelContextEnter() {
+ String resourceName = createResourceName("testTransformMonoWithSentinelContextEnter");
+ String contextName = "test_reactive_context";
+ String origin = "originA";
+ FlowRuleManager.loadRules(Collections.singletonList(
+ new FlowRule(resourceName).setCount(0).setLimitApp(origin).as(FlowRule.class)
+ ));
+ StepVerifier.create(Mono.just(2)
+ .transform(new SentinelReactorTransformer<>(
+ // Customized context with origin.
+ new EntryConfig(resourceName, EntryType.OUT, new ContextConfig(contextName, origin))))
+ )
+ .expectError(BlockException.class)
+ .verify();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(0, cn.passQps(), 0.01);
+ assertEquals(1, cn.blockRequest());
+ assertTrue(Constants.ROOT.getChildList()
+ .stream()
+ .filter(node -> node instanceof EntranceNode)
+ .map(e -> (EntranceNode)e)
+ .anyMatch(e -> e.getId().getName().equals(contextName))
+ );
+
+ FlowRuleManager.loadRules(new ArrayList<>());
+ }
+
+ @Test
+ public void testFluxToMonoNextThenCancelSuccess() {
+ String resourceName = createResourceName("testFluxToMonoNextThenCancelSuccess");
+ StepVerifier.create(Flux.range(1, 10)
+ .map(e -> e * 2)
+ .next()
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .expectNext(2)
+ .verifyComplete();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(1, cn.passQps(), 0.01);
+ }
+
+ @Test
+ public void testEmitSingleLongTimeRt() {
+ String resourceName = createResourceName("testEmitSingleLongTimeRt");
+ StepVerifier.create(Mono.just(2)
+ .delayElement(Duration.ofMillis(1000))
+ .map(e -> e * 2)
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .expectNext(4)
+ .verifyComplete();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(1000, cn.avgRt(), 20);
+ }
+
+ @Test
+ public void testEmitEmptySuccess() {
+ String resourceName = createResourceName("testEmitEmptySuccess");
+ StepVerifier.create(Mono.empty()
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .verifyComplete();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(1, cn.passQps(), 0.01);
+ }
+
+ @Test
+ public void testEmitSingleSuccess() {
+ String resourceName = createResourceName("testEmitSingleSuccess");
+ StepVerifier.create(Mono.just(1)
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .expectNext(1)
+ .verifyComplete();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(1, cn.passQps(), 0.01);
+ }
+
+ @Test
+ public void testEmitSingleValueWhenFlowControlTriggered() {
+ String resourceName = createResourceName("testEmitSingleValueWhenFlowControlTriggered");
+ FlowRuleManager.loadRules(Collections.singletonList(
+ new FlowRule(resourceName).setCount(0)
+ ));
+ StepVerifier.create(Mono.just(1)
+ .map(e -> e * 2)
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .expectError(BlockException.class)
+ .verify();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(0, cn.passQps(), 0.01);
+ assertEquals(1, cn.blockRequest());
+
+ FlowRuleManager.loadRules(new ArrayList<>());
+ }
+
+ @Test
+ public void testEmitExceptionWhenFlowControlTriggered() {
+ String resourceName = createResourceName("testEmitExceptionWhenFlowControlTriggered");
+ FlowRuleManager.loadRules(Collections.singletonList(
+ new FlowRule(resourceName).setCount(0)
+ ));
+ StepVerifier.create(Mono.error(new IllegalStateException("some"))
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .expectError(BlockException.class)
+ .verify();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(0, cn.passQps(), 0.01);
+ assertEquals(1, cn.blockRequest());
+
+ FlowRuleManager.loadRules(new ArrayList<>());
+ }
+
+ @Test
+ public void testEmitSingleError() {
+ String resourceName = createResourceName("testEmitSingleError");
+ StepVerifier.create(Mono.error(new IllegalStateException())
+ .transform(new SentinelReactorTransformer<>(resourceName)))
+ .expectError(IllegalStateException.class)
+ .verify();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(1, cn.totalException());
+ }
+
+ private String createResourceName(String resourceName) {
+ return "reactor_test_mono_" + resourceName;
+ }
+}
\ No newline at end of file
diff --git a/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphUTest.java b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphUTest.java
new file mode 100644
index 00000000..0f4e5234
--- /dev/null
+++ b/sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphUTest.java
@@ -0,0 +1,74 @@
+package com.alibaba.csp.sentinel.adapter.reactor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import com.alibaba.csp.sentinel.node.ClusterNode;
+import com.alibaba.csp.sentinel.slots.block.BlockException;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
+import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
+
+import org.junit.Test;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Eric Zhao
+ */
+public class ReactorSphUTest {
+
+ @Test
+ public void testReactorEntryNormalWhenFlowControlTriggered() {
+ String resourceName = createResourceName("testReactorEntryNormalWhenFlowControlTriggered");
+ FlowRuleManager.loadRules(Collections.singletonList(
+ new FlowRule(resourceName).setCount(0)
+ ));
+ StepVerifier.create(ReactorSphU.entryWith(resourceName, Mono.just(60))
+ .subscribeOn(Schedulers.elastic())
+ .map(e -> e * 3))
+ .expectError(BlockException.class)
+ .verify();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(0, cn.passQps(), 0.01);
+ assertEquals(1, cn.blockRequest());
+
+ FlowRuleManager.loadRules(new ArrayList<>());
+ }
+
+ @Test
+ public void testReactorEntryWithCommon() {
+ String resourceName = createResourceName("testReactorEntryWithCommon");
+ StepVerifier.create(ReactorSphU.entryWith(resourceName, Mono.just(60))
+ .subscribeOn(Schedulers.elastic())
+ .map(e -> e * 3))
+ .expectNext(180)
+ .verifyComplete();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(1, cn.passQps(), 0.01);
+ }
+
+ @Test
+ public void testReactorEntryWithBizException() {
+ String resourceName = createResourceName("testReactorEntryWithBizException");
+ StepVerifier.create(ReactorSphU.entryWith(resourceName, Mono.error(new IllegalStateException())))
+ .expectError(IllegalStateException.class)
+ .verify();
+
+ ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
+ assertNotNull(cn);
+ assertEquals(1, cn.passQps(), 0.01);
+ assertEquals(1, cn.totalException());
+ }
+
+ private String createResourceName(String resourceName) {
+ return "reactor_test_SphU_" + resourceName;
+ }
+}
\ No newline at end of file