Browse Source

Migrate reactor testing to Java (#5679)

Anuraag Agrawal 3 years ago
parent
commit
5ec0937ea7
13 changed files with 918 additions and 841 deletions
  1. 0 10
      instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/ReactorCoreTest.groovy
  2. 0 10
      instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/SubscriptionTest.groovy
  3. 20 0
      instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java
  4. 20 0
      instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java
  5. 0 218
      instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy
  6. 0 22
      instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy
  7. 225 0
      instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java
  8. 34 0
      instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java
  9. 0 2
      instrumentation/reactor/reactor-3.1/testing/build.gradle.kts
  10. 0 521
      instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.groovy
  11. 0 58
      instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.groovy
  12. 578 0
      instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.java
  13. 41 0
      instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.java

+ 0 - 10
instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/ReactorCoreTest.groovy

@@ -1,10 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-import io.opentelemetry.instrumentation.reactor.AbstractReactorCoreTest
-import io.opentelemetry.instrumentation.test.AgentTestTrait
-
-class ReactorCoreTest extends AbstractReactorCoreTest implements AgentTestTrait {
-}

+ 0 - 10
instrumentation/reactor/reactor-3.1/javaagent/src/test/groovy/SubscriptionTest.groovy

@@ -1,10 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-import io.opentelemetry.instrumentation.reactor.AbstractSubscriptionTest
-import io.opentelemetry.instrumentation.test.AgentTestTrait
-
-class SubscriptionTest extends AbstractSubscriptionTest implements AgentTestTrait {
-}

+ 20 - 0
instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.reactor;
+
+import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class ReactorCoreTest extends AbstractReactorCoreTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
+
+  ReactorCoreTest() {
+    super(testing);
+  }
+}

+ 20 - 0
instrumentation/reactor/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.reactor;
+
+import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class SubscriptionTest extends AbstractSubscriptionTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
+
+  SubscriptionTest() {
+    super(testing);
+  }
+}

+ 0 - 218
instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy

@@ -1,218 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package io.opentelemetry.instrumentation.reactor
-
-import io.opentelemetry.api.GlobalOpenTelemetry
-import io.opentelemetry.api.trace.Span
-import io.opentelemetry.api.trace.SpanKind
-import io.opentelemetry.api.trace.StatusCode
-import io.opentelemetry.context.Context
-import io.opentelemetry.instrumentation.test.LibraryTestTrait
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import spock.lang.Shared
-
-class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrait {
-  @Shared
-  ContextPropagationOperator tracingOperator = ContextPropagationOperator.create()
-
-  def setupSpec() {
-    tracingOperator.registerOnEachOperator()
-  }
-
-  def cleanupSpec() {
-    tracingOperator.resetOnEachOperator()
-  }
-
-  def "Current in non-blocking publisher assembly"() {
-    when:
-    runWithSpan({
-      return publisherSupplier().transform({ publisher -> traceNonBlocking(publisher, "inner") })
-    })
-
-    then:
-    assertTraces(1) {
-      trace(0, 3) {
-        span(0) {
-          name "trace-parent"
-          hasNoParent()
-          attributes {
-          }
-        }
-
-        span(1) {
-          name "publisher-parent"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-
-        span(2) {
-          name "inner"
-          childOf span(1)
-          attributes {
-            "inner" "foo"
-          }
-        }
-      }
-    }
-
-    where:
-    paramName    | publisherSupplier
-    "basic mono" | { ->
-      Mono.fromCallable({ i ->
-        Span.current().setAttribute("inner", "foo")
-        return 1
-      })
-    }
-    "basic flux" | { ->
-      Flux.defer({
-        Span.current().setAttribute("inner", "foo")
-        return Flux.just([5, 6].toArray())
-      })
-    }
-  }
-
-  def "Nested non-blocking"() {
-    when:
-    def result = runWithSpan({
-      Mono.defer({ ->
-        Span.current().setAttribute("middle", "foo")
-        return Mono.fromCallable({ ->
-          Span.current().setAttribute("inner", "bar")
-          return 1
-        })
-          .transform({ i -> traceNonBlocking(i, "inner") })
-      })
-        .transform({ m -> traceNonBlocking(m, "middle") })
-    })
-
-    then:
-    result == 1
-    and:
-    assertTraces(1) {
-      trace(0, 4) {
-        span(0) {
-          name "trace-parent"
-          hasNoParent()
-          attributes {
-          }
-        }
-
-        span(1) {
-          name "publisher-parent"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-
-        span(2) {
-          name "middle"
-          childOf span(1)
-          attributes {
-            "middle" "foo"
-          }
-        }
-
-        span(3) {
-          name "inner"
-          childOf span(2)
-          attributes {
-            "inner" "bar"
-          }
-        }
-      }
-    }
-  }
-
-
-  def "No tracing before registration"() {
-    when:
-    tracingOperator.resetOnEachOperator()
-
-    def result1 = Mono.fromCallable({ ->
-      assert !Span.current().getSpanContext().isValid(): "current span is not set"
-      return 1
-    })
-      .transform({ i ->
-
-        def beforeSpan = GlobalOpenTelemetry.getTracer("test").spanBuilder("before").startSpan()
-
-        return ContextPropagationOperator
-          .runWithContext(i, Context.root().with(beforeSpan))
-          .doOnEach({ signal ->
-            assert !Span.current().getSpanContext().isValid(): "current span is not set"
-          })
-      }).block()
-
-    tracingOperator.registerOnEachOperator()
-    def result2 = Mono.fromCallable({ ->
-      assert Span.current().getSpanContext().isValid(): "current span is set"
-      return 2
-    })
-      .transform({ i ->
-
-        def afterSpan = GlobalOpenTelemetry.getTracer("test").spanBuilder("after").startSpan()
-
-        return ContextPropagationOperator
-          .runWithContext(i, Context.root().with(afterSpan))
-          .doOnEach({ signal ->
-            assert Span.current().getSpanContext().isValid(): "current span is set"
-            if (signal.isOnComplete()) {
-              Span.current().end()
-            }
-          })
-      }).block()
-
-    then:
-    result1 == 1
-    result2 == 2
-    and:
-    assertTraces(1) {
-      trace(0, 1) {
-        span(0) {
-          name "after"
-          hasNoParent()
-          attributes {
-          }
-        }
-      }
-    }
-  }
-
-  def traceNonBlocking(def publisher, def spanName) {
-    return getDummy(publisher)
-      .flatMap({ i -> publisher })
-      .doOnEach({ signal ->
-        if (signal.isOnError()) {
-          // reactor 3.1 does not support getting context here yet
-          Span.current().setStatus(StatusCode.ERROR)
-          Span.current().end()
-        } else if (signal.isOnComplete()) {
-          Span.current().end()
-        }
-      })
-      .subscriberContext({ ctx ->
-
-        def parent = ContextPropagationOperator.getOpenTelemetryContext(ctx, Context.current())
-
-        def innerSpan = GlobalOpenTelemetry.getTracer("test")
-          .spanBuilder(spanName)
-          .setParent(parent)
-          .startSpan()
-
-        return ContextPropagationOperator.storeOpenTelemetryContext(ctx, parent.with(innerSpan))
-      })
-  }
-
-  def getDummy(def publisher) {
-    if (publisher instanceof Mono) {
-      return ContextPropagationOperator.ScalarPropagatingMono.INSTANCE
-    } else if (publisher instanceof Flux) {
-      return ContextPropagationOperator.ScalarPropagatingFlux.INSTANCE
-    }
-
-    throw new IllegalStateException("Unknown publisher")
-  }
-}

+ 0 - 22
instrumentation/reactor/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy

@@ -1,22 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package io.opentelemetry.instrumentation.reactor
-
-import io.opentelemetry.instrumentation.test.LibraryTestTrait
-import spock.lang.Shared
-
-class SubscriptionTest extends AbstractSubscriptionTest implements LibraryTestTrait {
-  @Shared
-  ContextPropagationOperator tracingOperator = ContextPropagationOperator.create()
-
-  def setupSpec() {
-    tracingOperator.registerOnEachOperator()
-  }
-
-  def cleanupSpec() {
-    tracingOperator.resetOnEachOperator()
-  }
-}

+ 225 - 0
instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java

@@ -0,0 +1,225 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.reactor;
+
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class ReactorCoreTest extends AbstractReactorCoreTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
+
+  private final ContextPropagationOperator tracingOperator = ContextPropagationOperator.create();
+  private final Tracer tracer = testing.getOpenTelemetry().getTracer("test");
+
+  ReactorCoreTest() {
+    super(testing);
+  }
+
+  @BeforeAll
+  void setUp() {
+    tracingOperator.registerOnEachOperator();
+  }
+
+  @AfterAll
+  void tearDown() {
+    tracingOperator.resetOnEachOperator();
+  }
+
+  @Test
+  void monoInNonBlockingPublisherAssembly() {
+    testing.runWithSpan(
+        "parent",
+        () ->
+            monoSpan(
+                    Mono.fromCallable(
+                        () -> {
+                          Span.current().setAttribute("inner", "foo");
+                          return 1;
+                        }),
+                    "inner")
+                .block());
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span ->
+                    span.hasName("inner")
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributes(attributeEntry("inner", "foo"))));
+  }
+
+  @Test
+  void fluxInNonBlockingPublisherAssembly() {
+    testing.runWithSpan(
+        "parent",
+        () ->
+            ContextPropagationOperator.ScalarPropagatingFlux.INSTANCE
+                .flatMap(
+                    unused ->
+                        Flux.defer(
+                            () -> {
+                              Span.current().setAttribute("inner", "foo");
+                              return Flux.just(5, 6);
+                            }))
+                .doOnEach(
+                    signal -> {
+                      if (signal.isOnError()) {
+                        // reactor 3.1 does not support getting context here yet
+                        Span.current().setStatus(StatusCode.ERROR);
+                        Span.current().end();
+                      } else if (signal.isOnComplete()) {
+                        Span.current().end();
+                      }
+                    })
+                .subscriberContext(
+                    ctx -> {
+                      Context parent =
+                          ContextPropagationOperator.getOpenTelemetryContext(
+                              ctx, Context.current());
+
+                      Span innerSpan = tracer.spanBuilder("inner").setParent(parent).startSpan();
+                      return ContextPropagationOperator.storeOpenTelemetryContext(
+                          ctx, parent.with(innerSpan));
+                    })
+                .collectList()
+                .block());
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span ->
+                    span.hasName("inner")
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributes(attributeEntry("inner", "foo"))));
+  }
+
+  @Test
+  void nestedNonBlocking() {
+    int result =
+        testing.runWithSpan(
+            "parent",
+            () ->
+                Mono.defer(
+                        () -> {
+                          Span.current().setAttribute("middle", "foo");
+                          return Mono.fromCallable(
+                                  () -> {
+                                    Span.current().setAttribute("inner", "bar");
+                                    return 1;
+                                  })
+                              .transform(publisher -> monoSpan(publisher, "inner"));
+                        })
+                    .transform(publisher -> monoSpan(publisher, "middle"))
+                    .block());
+
+    assertThat(result).isEqualTo(1);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span ->
+                    span.hasName("middle")
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributes(attributeEntry("middle", "foo")),
+                span ->
+                    span.hasName("inner")
+                        .hasParent(trace.getSpan(1))
+                        .hasAttributes(attributeEntry("inner", "bar"))));
+  }
+
+  @Test
+  void noTracingBeforeRegistration() {
+    tracingOperator.resetOnEachOperator();
+
+    Integer result1 =
+        Mono.fromCallable(
+                () -> {
+                  assertThat(Span.current().getSpanContext().isValid()).isFalse();
+                  return 1;
+                })
+            .transform(
+                mono -> {
+                  // NB: Because context propagation is disabled, this span is effectively leaked as
+                  // we cannot access it again to
+                  // end after processing.
+                  Span span = tracer.spanBuilder("before").startSpan();
+                  return ContextPropagationOperator.runWithContext(mono, Context.root().with(span))
+                      .doOnEach(
+                          unused ->
+                              assertThat(Span.current().getSpanContext().isValid()).isFalse());
+                })
+            .block();
+
+    tracingOperator.registerOnEachOperator();
+    Integer result2 =
+        Mono.fromCallable(
+                () -> {
+                  assertThat(Span.current().getSpanContext().isValid()).isTrue();
+                  return 2;
+                })
+            .transform(
+                mono -> {
+                  Span span = tracer.spanBuilder("after").startSpan();
+                  return ContextPropagationOperator.runWithContext(mono, Context.root().with(span))
+                      .doOnEach(
+                          signal -> {
+                            assertThat(Span.current().getSpanContext().isValid()).isTrue();
+                            if (signal.isOnComplete()) {
+                              Span.current().end();
+                            }
+                          });
+                })
+            .block();
+
+    assertThat(result1).isEqualTo(1);
+    assertThat(result2).isEqualTo(2);
+
+    testing.waitAndAssertTraces(
+        trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("after").hasNoParent()));
+  }
+
+  private <T> Mono<T> monoSpan(Mono<T> mono, String spanName) {
+    return ContextPropagationOperator.ScalarPropagatingMono.INSTANCE
+        .flatMap(unused -> mono)
+        .doOnEach(
+            signal -> {
+              if (signal.isOnError()) {
+                // reactor 3.1 does not support getting context here yet
+                Span.current().setStatus(StatusCode.ERROR);
+                Span.current().end();
+              } else if (signal.isOnComplete()) {
+                Span.current().end();
+              }
+            })
+        .subscriberContext(
+            ctx -> {
+              Context parent =
+                  ContextPropagationOperator.getOpenTelemetryContext(ctx, Context.current());
+
+              Span innerSpan = tracer.spanBuilder(spanName).setParent(parent).startSpan();
+              return ContextPropagationOperator.storeOpenTelemetryContext(
+                  ctx, parent.with(innerSpan));
+            });
+  }
+}

+ 34 - 0
instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/SubscriptionTest.java

@@ -0,0 +1,34 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.reactor;
+
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class SubscriptionTest extends AbstractSubscriptionTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
+
+  private final ContextPropagationOperator tracingOperator = ContextPropagationOperator.create();
+
+  SubscriptionTest() {
+    super(testing);
+  }
+
+  @BeforeAll
+  void setUp() {
+    tracingOperator.registerOnEachOperator();
+  }
+
+  @AfterAll
+  void tearDown() {
+    tracingOperator.resetOnEachOperator();
+  }
+}

+ 0 - 2
instrumentation/reactor/reactor-3.1/testing/build.gradle.kts

@@ -7,7 +7,5 @@ dependencies {
 
   api("io.projectreactor:reactor-core:3.1.0.RELEASE")
 
-  implementation("org.apache.groovy:groovy")
   implementation("io.opentelemetry:opentelemetry-api")
-  implementation("org.spockframework:spock-core")
 }

+ 0 - 521
instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.groovy

@@ -1,521 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package io.opentelemetry.instrumentation.reactor
-
-import io.opentelemetry.api.GlobalOpenTelemetry
-import io.opentelemetry.api.common.AttributeKey
-import io.opentelemetry.api.trace.Span
-import io.opentelemetry.api.trace.SpanKind
-import io.opentelemetry.context.Context
-import io.opentelemetry.instrumentation.test.InstrumentationSpecification
-import org.reactivestreams.Publisher
-import org.reactivestreams.Subscriber
-import org.reactivestreams.Subscription
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import spock.lang.Shared
-import spock.lang.Unroll
-
-import java.time.Duration
-
-import static io.opentelemetry.api.trace.StatusCode.ERROR
-
-@Unroll
-abstract class AbstractReactorCoreTest extends InstrumentationSpecification {
-
-  public static final String EXCEPTION_MESSAGE = "test exception"
-
-  @Shared
-  def addOne = { i ->
-    addOneFunc(i)
-  }
-
-  @Shared
-  def addTwo = { i ->
-    addTwoFunc(i)
-  }
-
-  @Shared
-  def throwException = {
-    throw new IllegalStateException(EXCEPTION_MESSAGE)
-  }
-
-  def "Publisher '#paramName' test"() {
-    when:
-    def result = runWithSpan(publisherSupplier)
-
-    then:
-    result == expected
-    and:
-    assertTraces(1) {
-      trace(0, workSpans + 2) {
-        span(0) {
-          name "trace-parent"
-          kind SpanKind.INTERNAL
-          hasNoParent()
-        }
-        span(1) {
-          name "publisher-parent"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-
-        for (int i = 0; i < workSpans; i++) {
-          span(2 + i) {
-            name "add one"
-            kind SpanKind.INTERNAL
-            childOf span(1)
-          }
-        }
-      }
-    }
-
-    where:
-    paramName             | expected | workSpans | publisherSupplier
-    "basic mono"          | 2        | 1         | { -> Mono.just(1).map(addOne) }
-    "two operations mono" | 4        | 2         | { -> Mono.just(2).map(addOne).map(addOne) }
-    "delayed mono"        | 4        | 1         | { ->
-      Mono.just(3).delayElement(Duration.ofMillis(100)).map(addOne)
-    }
-    "delayed twice mono"  | 6        | 2         | { ->
-      Mono.just(4).delayElement(Duration.ofMillis(100)).map(addOne).delayElement(Duration.ofMillis(100)).map(addOne)
-    }
-    "basic flux"          | [6, 7]   | 2         | { -> Flux.fromIterable([5, 6]).map(addOne) }
-    "two operations flux" | [8, 9]   | 4         | { ->
-      Flux.fromIterable([6, 7]).map(addOne).map(addOne)
-    }
-    "delayed flux"        | [8, 9]   | 2         | { ->
-      Flux.fromIterable([7, 8]).delayElements(Duration.ofMillis(100)).map(addOne)
-    }
-    "delayed twice flux"  | [10, 11] | 4         | { ->
-      Flux.fromIterable([8, 9]).delayElements(Duration.ofMillis(100)).map(addOne).delayElements(Duration.ofMillis(100)).map(addOne)
-    }
-
-    "mono from callable"  | 12       | 2         | { ->
-      Mono.fromCallable({ addOneFunc(10) }).map(addOne)
-    }
-  }
-
-  def "Publisher error '#paramName' test"() {
-    when:
-    runWithSpan(publisherSupplier)
-
-    then:
-    def exception = thrown RuntimeException
-    exception.message == EXCEPTION_MESSAGE
-    and:
-    assertTraces(1) {
-      trace(0, 2) {
-        span(0) {
-          name "trace-parent"
-          status ERROR
-          errorEvent(RuntimeException, EXCEPTION_MESSAGE)
-          hasNoParent()
-        }
-
-        // It's important that we don't attach errors at the Reactor level so that we don't
-        // impact the spans on reactor instrumentations such as netty and lettuce, as reactor is
-        // more of a context propagation mechanism than something we would be tracking for
-        // errors this is ok.
-        span(1) {
-          name "publisher-parent"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-      }
-    }
-
-    where:
-    paramName | publisherSupplier
-    "mono"    | { -> Mono.error(new RuntimeException(EXCEPTION_MESSAGE)) }
-    "flux"    | { -> Flux.error(new RuntimeException(EXCEPTION_MESSAGE)) }
-  }
-
-  def "Publisher step '#paramName' test"() {
-    when:
-    runWithSpan(publisherSupplier)
-
-    then:
-    def exception = thrown IllegalStateException
-    exception.message == EXCEPTION_MESSAGE
-    and:
-    assertTraces(1) {
-      trace(0, workSpans + 2) {
-        span(0) {
-          name "trace-parent"
-          status ERROR
-          errorEvent(IllegalStateException, EXCEPTION_MESSAGE)
-          hasNoParent()
-        }
-
-        // It's important that we don't attach errors at the Reactor level so that we don't
-        // impact the spans on reactor instrumentations such as netty and lettuce, as reactor is
-        // more of a context propagation mechanism than something we would be tracking for
-        // errors this is ok.
-        span(1) {
-          name "publisher-parent"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-
-        for (int i = 0; i < workSpans; i++) {
-          span(i + 2) {
-            name "add one"
-            childOf span(1)
-            attributes {
-            }
-          }
-        }
-      }
-    }
-
-    where:
-    paramName            | workSpans | publisherSupplier
-    "basic mono failure" | 1         | { -> Mono.just(1).map(addOne).map({ throwException() }) }
-    "basic flux failure" | 1         | { ->
-      Flux.fromIterable([5, 6]).map(addOne).map({ throwException() })
-    }
-  }
-
-  def "Publisher '#paramName' cancel"() {
-    when:
-    cancelUnderTrace(publisherSupplier)
-
-    then:
-    assertTraces(1) {
-      trace(0, 2) {
-        span(0) {
-          name "trace-parent"
-          hasNoParent()
-          attributes {
-          }
-        }
-
-        span(1) {
-          name "publisher-parent"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-      }
-    }
-
-    where:
-    paramName    | publisherSupplier
-    "basic mono" | { -> Mono.just(1) }
-    "basic flux" | { -> Flux.fromIterable([5, 6]) }
-  }
-
-  def "Publisher chain spans have the correct parent for '#paramName'"() {
-    when:
-    runWithSpan(publisherSupplier)
-
-    then:
-    assertTraces(1) {
-      trace(0, workSpans + 2) {
-        span(0) {
-          name "trace-parent"
-          hasNoParent()
-          attributes {
-          }
-        }
-
-        span(1) {
-          name "publisher-parent"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-
-        for (int i = 0; i < workSpans; i++) {
-          span(i + 2) {
-            name "add one"
-            childOf span(1)
-            attributes {
-            }
-          }
-        }
-      }
-    }
-
-    where:
-    paramName    | workSpans | publisherSupplier
-    "basic mono" | 3         | { ->
-      Mono.just(1).map(addOne).map(addOne).then(Mono.just(1).map(addOne))
-    }
-    "basic flux" | 5         | { ->
-      Flux.fromIterable([5, 6]).map(addOne).map(addOne).then(Mono.just(1).map(addOne))
-    }
-  }
-
-  def "Publisher chain spans have the correct parents from assembly time '#paramName'"() {
-    when:
-    runWithSpan {
-      // The "add one" operations in the publisher created here should be children of the publisher-parent
-      Publisher<Integer> publisher = publisherSupplier()
-
-      def tracer = GlobalOpenTelemetry.getTracer("test")
-      def intermediate = tracer.spanBuilder("intermediate").startSpan()
-      // After this activation, the "add two" operations below should be children of this span
-      def scope = Context.current().with(intermediate).makeCurrent()
-      try {
-        if (publisher instanceof Mono) {
-          return ((Mono) publisher).map(addTwo)
-        } else if (publisher instanceof Flux) {
-          return ((Flux) publisher).map(addTwo)
-        }
-        throw new IllegalStateException("Unknown publisher type")
-      } finally {
-        intermediate.end()
-        scope.close()
-      }
-    }
-
-    then:
-    assertTraces(1) {
-      trace(0, (workItems * 2) + 3) {
-        span(0) {
-          name "trace-parent"
-          kind SpanKind.INTERNAL
-          hasNoParent()
-        }
-        span(1) {
-          name "publisher-parent"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-        span(2) {
-          name "intermediate"
-          kind SpanKind.INTERNAL
-          childOf span(1)
-        }
-
-        for (int i = 0; i < 2 * workItems; i = i + 2) {
-          span(3 + i) {
-            name "add one"
-            kind SpanKind.INTERNAL
-            childOf span(1)
-          }
-          span(3 + i + 1) {
-            name "add two"
-            kind SpanKind.INTERNAL
-            childOf span(1)
-          }
-        }
-      }
-    }
-
-    where:
-    paramName    | workItems | publisherSupplier
-    "basic mono" | 1         | { -> Mono.just(1).map(addOne) }
-    "basic flux" | 2         | { -> Flux.fromIterable([1, 2]).map(addOne) }
-  }
-
-  def "Nested delayed mono with high concurrency"() {
-    setup:
-    def iterations = 100
-    def remainingIterations = new HashSet<>((0L..<iterations).toList())
-
-    when:
-    (0L..<iterations).forEach { iteration ->
-      def outer = Mono.just("")
-        .map({ it })
-        .delayElement(Duration.ofMillis(10))
-        .map({ it })
-        .delayElement(Duration.ofMillis(10))
-        .doOnSuccess({
-          def middle = Mono.just("")
-            .map({ it })
-            .delayElement(Duration.ofMillis(10))
-            .doOnSuccess({
-              runWithSpan("inner") {
-                Span.current().setAttribute("iteration", iteration)
-              }
-            })
-
-          runWithSpan("middle") {
-            Span.current().setAttribute("iteration", iteration)
-            middle.subscribe()
-          }
-        })
-
-      // Context must propagate even if only subscribe is in root span scope
-      runWithSpan("outer") {
-        Span.current().setAttribute("iteration", iteration)
-        outer.subscribe()
-      }
-    }
-
-    then:
-    assertTraces(iterations) {
-      for (int i = 0; i < iterations; i++) {
-        trace(i, 3) {
-          long iteration = -1
-          span(0) {
-            name("outer")
-            iteration = span.getAttributes().get(AttributeKey.longKey("iteration")).toLong()
-            assert remainingIterations.remove(iteration)
-          }
-          span(1) {
-            name("middle")
-            childOf(span(0))
-            assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
-          }
-          span(2) {
-            name("inner")
-            childOf(span(1))
-            assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
-          }
-        }
-      }
-    }
-
-    assert remainingIterations.isEmpty()
-  }
-
-  def "Nested delayed flux with high concurrency"() {
-    setup:
-    def iterations = 100
-    def remainingIterations = new HashSet<>((0L..<iterations).toList())
-
-    when:
-    (0L..<iterations).forEach { iteration ->
-      def outer = Flux.just("a", "b")
-        .map({ it })
-        .delayElements(Duration.ofMillis(10))
-        .map({ it })
-        .delayElements(Duration.ofMillis(10))
-        .doOnEach({ middleSignal ->
-          if (middleSignal.hasValue()) {
-            def value = middleSignal.get()
-
-            def middle = Flux.just("c", "d")
-              .map({ it })
-              .delayElements(Duration.ofMillis(10))
-              .doOnEach({ innerSignal ->
-                if (innerSignal.hasValue()) {
-                  runWithSpan("inner " + value + innerSignal.get()) {
-                    Span.current().setAttribute("iteration", iteration)
-                  }
-                }
-              })
-
-            runWithSpan("middle " + value) {
-              Span.current().setAttribute("iteration", iteration)
-              middle.subscribe()
-            }
-          }
-        })
-
-      // Context must propagate even if only subscribe is in root span scope
-      runWithSpan("outer") {
-        Span.current().setAttribute("iteration", iteration)
-        outer.subscribe()
-      }
-    }
-
-    then:
-    assertTraces(iterations) {
-      for (int i = 0; i < iterations; i++) {
-        trace(i, 7) {
-          long iteration = -1
-          String middleA = null
-          String middleB = null
-          span(0) {
-            name("outer")
-            iteration = span.getAttributes().get(AttributeKey.longKey("iteration")).toLong()
-            assert remainingIterations.remove(iteration)
-          }
-          span("middle a") {
-            middleA = span.spanId
-            childOf(span(0))
-            assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
-          }
-          span("middle b") {
-            middleB = span.spanId
-            childOf(span(0))
-            assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
-          }
-          span("inner ac") {
-            parentSpanId(middleA)
-            assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
-          }
-          span("inner ad") {
-            parentSpanId(middleA)
-            assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
-          }
-          span("inner bc") {
-            parentSpanId(middleB)
-            assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
-          }
-          span("inner bd") {
-            parentSpanId(middleB)
-            assert span.getAttributes().get(AttributeKey.longKey("iteration")) == iteration
-          }
-        }
-      }
-    }
-
-    assert remainingIterations.isEmpty()
-  }
-
-  def runWithSpan(def publisherSupplier) {
-    runWithSpan("trace-parent") {
-      def tracer = GlobalOpenTelemetry.getTracer("test")
-      def span = tracer.spanBuilder("publisher-parent").startSpan()
-      def scope = Context.current().with(span).makeCurrent()
-      try {
-        def publisher = publisherSupplier()
-        // Read all data from publisher
-        if (publisher instanceof Mono) {
-          return publisher.block()
-        } else if (publisher instanceof Flux) {
-          return publisher.toStream().toArray({ size -> new Integer[size] })
-        }
-
-        throw new IllegalStateException("Unknown publisher: " + publisher)
-      } finally {
-        span.end()
-        scope.close()
-      }
-    }
-  }
-
-  def cancelUnderTrace(def publisherSupplier) {
-    runWithSpan("trace-parent") {
-      def tracer = GlobalOpenTelemetry.getTracer("test")
-      def span = tracer.spanBuilder("publisher-parent").startSpan()
-      def scope = Context.current().with(span).makeCurrent()
-
-      def publisher = publisherSupplier()
-      publisher.subscribe(new Subscriber<Integer>() {
-        void onSubscribe(Subscription subscription) {
-          subscription.cancel()
-        }
-
-        void onNext(Integer t) {
-        }
-
-        void onError(Throwable error) {
-        }
-
-        void onComplete() {
-        }
-      })
-
-      span.end()
-      scope.close()
-    }
-  }
-
-  int addOneFunc(int i) {
-    runWithSpan("add one") {}
-    return i + 1
-  }
-
-  int addTwoFunc(int i) {
-    runWithSpan("add two") {}
-    return i + 2
-  }
-}

+ 0 - 58
instrumentation/reactor/reactor-3.1/testing/src/main/groovy/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.groovy

@@ -1,58 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package io.opentelemetry.instrumentation.reactor
-
-import io.opentelemetry.api.GlobalOpenTelemetry
-import io.opentelemetry.api.trace.SpanKind
-import io.opentelemetry.instrumentation.test.InstrumentationSpecification
-import reactor.core.publisher.Mono
-
-import java.time.Duration
-import java.util.concurrent.CountDownLatch
-
-abstract class AbstractSubscriptionTest extends InstrumentationSpecification {
-
-  def "subscription test"() {
-    when:
-    Mono<Connection> connection = Mono.create {
-      it.success(new Connection())
-    }
-    CountDownLatch latch = new CountDownLatch(1)
-    runWithSpan("parent") {
-      connection
-        .delayElement(Duration.ofMillis(1))
-        .subscribe {
-          it.query()
-          latch.countDown()
-        }
-    }
-    latch.await()
-
-    then:
-    assertTraces(1) {
-      trace(0, 2) {
-        span(0) {
-          name "parent"
-          kind SpanKind.INTERNAL
-          hasNoParent()
-        }
-        span(1) {
-          name "Connection.query"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-      }
-    }
-  }
-
-  static class Connection {
-    static int query() {
-      def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()
-      span.end()
-      return new Random().nextInt()
-    }
-  }
-}

+ 578 - 0
instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractReactorCoreTest.java

@@ -0,0 +1,578 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.reactor;
+
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.sdk.testing.assertj.TraceAssert;
+import io.opentelemetry.sdk.trace.data.StatusData;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractReactorCoreTest {
+
+  private final InstrumentationExtension testing;
+
+  protected AbstractReactorCoreTest(InstrumentationExtension testing) {
+    this.testing = testing;
+  }
+
+  @Test
+  void basicMono() {
+    int result = testing.runWithSpan("parent", () -> Mono.just(1).map(this::addOne).block());
+    assertThat(result).isEqualTo(2);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void twoOperationsMono() {
+    int result =
+        testing.runWithSpan(
+            "parent", () -> Mono.just(2).map(this::addOne).map(this::addOne).block());
+    assertThat(result).isEqualTo(4);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void delayedMono() {
+    int result =
+        testing.runWithSpan(
+            "parent",
+            () -> Mono.just(3).delayElement(Duration.ofMillis(1)).map(this::addOne).block());
+    assertThat(result).isEqualTo(4);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void delayedTwiceMono() {
+    int result =
+        testing.runWithSpan(
+            "parent",
+            () ->
+                Mono.just(4)
+                    .delayElement(Duration.ofMillis(1))
+                    .map(this::addOne)
+                    .delayElement(Duration.ofMillis(1))
+                    .map(this::addOne)
+                    .block());
+    assertThat(result).isEqualTo(6);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void basicFlux() {
+    List<Integer> result =
+        testing.runWithSpan(
+            "parent",
+            () -> Flux.fromStream(Stream.of(5, 6)).map(this::addOne).collectList().block());
+    assertThat(result).containsExactly(6, 7);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void twoOperationsFlux() {
+    List<Integer> result =
+        testing.runWithSpan(
+            "parent",
+            () ->
+                Flux.fromStream(Stream.of(6, 7))
+                    .map(this::addOne)
+                    .map(this::addOne)
+                    .collectList()
+                    .block());
+    assertThat(result).containsExactly(8, 9);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void delayedFlux() {
+    List<Integer> result =
+        testing.runWithSpan(
+            "parent",
+            () ->
+                Flux.fromStream(Stream.of(7, 8))
+                    .delayElements(Duration.ofMillis(1))
+                    .map(this::addOne)
+                    .collectList()
+                    .block());
+    assertThat(result).containsExactly(8, 9);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void delayedTwiceFlux() {
+    List<Integer> result =
+        testing.runWithSpan(
+            "parent",
+            () ->
+                Flux.fromStream(Stream.of(8, 9))
+                    .delayElements(Duration.ofMillis(1))
+                    .map(this::addOne)
+                    .delayElements(Duration.ofMillis(1))
+                    .map(this::addOne)
+                    .collectList()
+                    .block());
+    assertThat(result).containsExactly(10, 11);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void monoFromCallable() {
+    int result =
+        testing.runWithSpan(
+            "parent", () -> Mono.fromCallable(() -> addOne(10)).map(this::addOne).block());
+    assertThat(result).isEqualTo(12);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void monoError() {
+    IllegalStateException error = new IllegalStateException("exception");
+    assertThatThrownBy(() -> testing.runWithSpan("parent", () -> Mono.error(error).block()))
+        .isEqualTo(error);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName("parent")
+                        .hasNoParent()
+                        .hasStatus(StatusData.error())
+                        .hasException(error)));
+  }
+
+  @Test
+  void fluxError() {
+    IllegalStateException error = new IllegalStateException("exception");
+    assertThatThrownBy(
+            () -> testing.runWithSpan("parent", () -> Flux.error(error).collectList().block()))
+        .isEqualTo(error);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName("parent")
+                        .hasNoParent()
+                        .hasStatus(StatusData.error())
+                        .hasException(error)));
+  }
+
+  @Test
+  void monoStepError() {
+    IllegalStateException error = new IllegalStateException("exception");
+    assertThatThrownBy(
+            () ->
+                testing.runWithSpan(
+                    "parent",
+                    () ->
+                        Mono.just(1)
+                            .map(this::addOne)
+                            .map(
+                                unused -> {
+                                  throw error;
+                                })
+                            .block()))
+        .isEqualTo(error);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName("parent")
+                        .hasNoParent()
+                        .hasStatus(StatusData.error())
+                        .hasException(error),
+                span ->
+                    span.hasName("add one")
+                        .hasParent(trace.getSpan(0))
+                        .hasStatus(StatusData.unset())));
+  }
+
+  @Test
+  void fluxStepError() {
+    IllegalStateException error = new IllegalStateException("exception");
+    assertThatThrownBy(
+            () ->
+                testing.runWithSpan(
+                    "parent",
+                    () ->
+                        Flux.just(5, 6)
+                            .map(this::addOne)
+                            .map(
+                                unused -> {
+                                  throw error;
+                                })
+                            .collectList()
+                            .block()))
+        .isEqualTo(error);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName("parent")
+                        .hasNoParent()
+                        .hasStatus(StatusData.error())
+                        .hasException(error),
+                span ->
+                    span.hasName("add one")
+                        .hasParent(trace.getSpan(0))
+                        .hasStatus(StatusData.unset())));
+  }
+
+  @Test
+  void cancelMono() {
+    testing.runWithSpan("parent", () -> Mono.just(1).subscribe(CancellingSubscriber.INSTANCE));
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent().hasStatus(StatusData.unset())));
+  }
+
+  @Test
+  void cancelFlux() {
+    testing.runWithSpan("parent", () -> Flux.just(3, 4).subscribe(CancellingSubscriber.INSTANCE));
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent().hasStatus(StatusData.unset())));
+  }
+
+  @Test
+  void monoChain() {
+    int result =
+        testing.runWithSpan(
+            "parent",
+            () ->
+                Mono.just(1)
+                    .map(this::addOne)
+                    .map(this::addOne)
+                    .then(Mono.just(1).map(this::addOne))
+                    .block());
+    assertThat(result).isEqualTo(2);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void fluxChain() {
+    int result =
+        testing.runWithSpan(
+            "parent",
+            () ->
+                Flux.just(5, 6)
+                    .map(this::addOne)
+                    .map(this::addOne)
+                    .then(Mono.just(1).map(this::addOne))
+                    .block());
+    assertThat(result).isEqualTo(2);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void monoChainHasAssemblyContext() {
+    int result =
+        testing.runWithSpan(
+            "parent",
+            () -> {
+              Mono<Integer> mono = Mono.just(1).map(this::addOne);
+              return testing.runWithSpan("intermediate", () -> mono.map(this::addTwo)).block();
+            });
+    assertThat(result).isEqualTo(4);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("intermediate").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add two").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void fluxChainHasAssemblyContext() {
+    List<Integer> result =
+        testing.runWithSpan(
+            "parent",
+            () -> {
+              Flux<Integer> flux = Flux.just(1, 2).map(this::addOne);
+              return testing
+                  .runWithSpan("intermediate", () -> flux.map(this::addTwo))
+                  .collectList()
+                  .block();
+            });
+    assertThat(result).containsExactly(4, 5);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("intermediate").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add two").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add one").hasParent(trace.getSpan(0)),
+                span -> span.hasName("add two").hasParent(trace.getSpan(0))));
+  }
+
+  @Test
+  void nestedDelayedMonoHighConcurrency() {
+    for (int i = 0; i < 100; i++) {
+      int iteration = i;
+      Mono<String> outer =
+          Mono.just("")
+              .map(Function.identity())
+              .delayElement(Duration.ofMillis(1))
+              .map(Function.identity())
+              .delayElement(Duration.ofMillis(1))
+              .doOnSuccess(
+                  unused -> {
+                    Mono<String> middle =
+                        Mono.just("")
+                            .map(Function.identity())
+                            .doOnSuccess(
+                                unused2 ->
+                                    testing.runWithSpan(
+                                        "inner",
+                                        () -> Span.current().setAttribute("iteration", iteration)));
+
+                    testing.runWithSpan(
+                        "middle",
+                        () -> {
+                          Span.current().setAttribute("iteration", iteration);
+                          middle.subscribe();
+                        });
+                  });
+
+      // Context must propagate even if only subscribe is in root span scope
+      testing.runWithSpan(
+          "outer",
+          () -> {
+            Span.current().setAttribute("iteration", iteration);
+            outer.subscribe();
+          });
+    }
+
+    List<Consumer<TraceAssert>> assertions = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      int iteration = i;
+      assertions.add(
+          trace ->
+              trace.hasSpansSatisfyingExactly(
+                  span ->
+                      span.hasName("outer")
+                          .hasNoParent()
+                          .hasAttributes(attributeEntry("iteration", iteration)),
+                  span ->
+                      span.hasName("middle")
+                          .hasParent(trace.getSpan(0))
+                          .hasAttributes(attributeEntry("iteration", iteration)),
+                  span ->
+                      span.hasName("inner")
+                          .hasParent(trace.getSpan(1))
+                          .hasAttributes(attributeEntry("iteration", iteration))));
+    }
+    testing.waitAndAssertTraces(assertions);
+  }
+
+  @Test
+  void nestedDelayedFluxHighConcurrency() {
+    for (int i = 0; i < 100; i++) {
+      int iteration = i;
+      Flux<String> outer =
+          Flux.just("a", "b")
+              .map(Function.identity())
+              .delayElements(Duration.ofMillis(1))
+              .map(Function.identity())
+              .delayElements(Duration.ofMillis(1))
+              .doOnEach(
+                  middleSignal -> {
+                    if (middleSignal.hasValue()) {
+                      String value = middleSignal.get();
+                      Flux<String> middle =
+                          Flux.just("c", "d")
+                              .map(Function.identity())
+                              .delayElements(Duration.ofMillis(1))
+                              .doOnEach(
+                                  innerSignal -> {
+                                    if (innerSignal.hasValue()) {
+                                      testing.runWithSpan(
+                                          "inner " + value + innerSignal.get(),
+                                          () ->
+                                              Span.current().setAttribute("iteration", iteration));
+                                    }
+                                  });
+
+                      testing.runWithSpan(
+                          "middle " + value,
+                          () -> {
+                            Span.current().setAttribute("iteration", iteration);
+                            middle.subscribe();
+                          });
+                    }
+                  });
+
+      // Context must propagate even if only subscribe is in root span scope
+      testing.runWithSpan(
+          "outer",
+          () -> {
+            Span.current().setAttribute("iteration", iteration);
+            outer.subscribe();
+          });
+    }
+
+    List<Consumer<TraceAssert>> assertions = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      int iteration = i;
+      assertions.add(
+          trace ->
+              trace.hasSpansSatisfyingExactly(
+                  span ->
+                      span.hasName("outer")
+                          .hasNoParent()
+                          .hasAttributes(attributeEntry("iteration", iteration)),
+                  span ->
+                      span.hasName("middle a")
+                          .hasParent(trace.getSpan(0))
+                          .hasAttributes(attributeEntry("iteration", iteration)),
+                  span ->
+                      span.hasName("inner ac")
+                          .hasParent(trace.getSpan(1))
+                          .hasAttributes(attributeEntry("iteration", iteration)),
+                  span ->
+                      span.hasName("inner ad")
+                          .hasParent(trace.getSpan(1))
+                          .hasAttributes(attributeEntry("iteration", iteration)),
+                  span ->
+                      span.hasName("middle b")
+                          .hasParent(trace.getSpan(0))
+                          .hasAttributes(attributeEntry("iteration", iteration)),
+                  span ->
+                      span.hasName("inner bc")
+                          .hasParent(trace.getSpan(4))
+                          .hasAttributes(attributeEntry("iteration", iteration)),
+                  span ->
+                      span.hasName("inner bd")
+                          .hasParent(trace.getSpan(4))
+                          .hasAttributes(attributeEntry("iteration", iteration))));
+    }
+    testing.waitAndAssertTraces(assertions);
+  }
+
+  private int addOne(int i) {
+    return testing.runWithSpan("add one", () -> i + 1);
+  }
+
+  private int addTwo(int i) {
+    return testing.runWithSpan("add two", () -> i + 2);
+  }
+
+  private enum CancellingSubscriber implements Subscriber<Integer> {
+    INSTANCE;
+
+    @Override
+    public void onSubscribe(Subscription subscription) {
+      subscription.cancel();
+    }
+
+    @Override
+    public void onNext(Integer integer) {}
+
+    @Override
+    public void onError(Throwable throwable) {}
+
+    @Override
+    public void onComplete() {}
+  }
+}

+ 41 - 0
instrumentation/reactor/reactor-3.1/testing/src/main/java/io/opentelemetry/instrumentation/reactor/AbstractSubscriptionTest.java

@@ -0,0 +1,41 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.reactor;
+
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import java.time.Duration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import reactor.core.publisher.Mono;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractSubscriptionTest {
+
+  private final InstrumentationExtension testing;
+
+  protected AbstractSubscriptionTest(InstrumentationExtension testing) {
+    this.testing = testing;
+  }
+
+  @Test
+  void subscription() {
+    Mono<Connection> connection = Mono.create(sink -> sink.success(new Connection()));
+    testing.runWithSpan(
+        "parent", () -> connection.delayElement(Duration.ofMillis(1)).subscribe(Connection::query));
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasNoParent(),
+                span -> span.hasName("Connection.query").hasParent(trace.getSpan(0))));
+  }
+
+  private class Connection {
+    void query() {
+      testing.runWithSpan("Connection.query", () -> {});
+    }
+  }
+}