Browse Source

Reactor instrumentation: do not make root context current (#6593)

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
Liudmila Molkova 2 years ago
parent
commit
97bc4a4fda

+ 5 - 1
instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java

@@ -20,6 +20,7 @@
 
 
 package io.opentelemetry.instrumentation.reactor;
 package io.opentelemetry.instrumentation.reactor;
 
 
+import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
 import io.opentelemetry.context.Scope;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 import org.reactivestreams.Subscription;
@@ -34,6 +35,7 @@ public class TracingSubscriber<T> implements CoreSubscriber<T> {
   private final io.opentelemetry.context.Context traceContext;
   private final io.opentelemetry.context.Context traceContext;
   private final Subscriber<? super T> subscriber;
   private final Subscriber<? super T> subscriber;
   private final Context context;
   private final Context context;
+  private final boolean hasContextToPropagate;
 
 
   public TracingSubscriber(Subscriber<? super T> subscriber, Context ctx) {
   public TracingSubscriber(Subscriber<? super T> subscriber, Context ctx) {
     this(subscriber, ctx, io.opentelemetry.context.Context.current());
     this(subscriber, ctx, io.opentelemetry.context.Context.current());
@@ -46,6 +48,8 @@ public class TracingSubscriber<T> implements CoreSubscriber<T> {
     this.subscriber = subscriber;
     this.subscriber = subscriber;
     this.context = ctx;
     this.context = ctx;
     this.traceContext = ContextPropagationOperator.getOpenTelemetryContext(ctx, contextToPropagate);
     this.traceContext = ContextPropagationOperator.getOpenTelemetryContext(ctx, contextToPropagate);
+    this.hasContextToPropagate =
+        traceContext == null ? false : Span.fromContext(traceContext).getSpanContext().isValid();
   }
   }
 
 
   @Override
   @Override
@@ -74,7 +78,7 @@ public class TracingSubscriber<T> implements CoreSubscriber<T> {
   }
   }
 
 
   private void withActiveSpan(Runnable runnable) {
   private void withActiveSpan(Runnable runnable) {
-    if (traceContext != null) {
+    if (hasContextToPropagate) {
       try (Scope ignored = traceContext.makeCurrent()) {
       try (Scope ignored = traceContext.makeCurrent()) {
         runnable.run();
         runnable.run();
       }
       }

+ 133 - 0
instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java

@@ -7,13 +7,16 @@ package io.opentelemetry.instrumentation.reactor;
 
 
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.StatusCode;
 import io.opentelemetry.api.trace.StatusCode;
 import io.opentelemetry.api.trace.Tracer;
 import io.opentelemetry.api.trace.Tracer;
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
 import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
 import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
 import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
 import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
+import java.time.Duration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;
@@ -22,6 +25,7 @@ import reactor.core.Scannable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.UnicastProcessor;
 import reactor.core.publisher.UnicastProcessor;
+import reactor.test.StepVerifier;
 
 
 class ReactorCoreTest extends AbstractReactorCoreTest {
 class ReactorCoreTest extends AbstractReactorCoreTest {
 
 
@@ -229,6 +233,135 @@ class ReactorCoreTest extends AbstractReactorCoreTest {
         .isPresent();
         .isPresent();
   }
   }
 
 
+  @Test
+  void doesNotOverrideInnerCurrentSpans() {
+    Flux<Object> publish =
+        Flux.create(
+            sink -> {
+              for (int i = 0; i < 2; i++) {
+                Span s = tracer.spanBuilder("inner").startSpan();
+                try (Scope scope = s.makeCurrent()) {
+                  sink.next(i);
+                } finally {
+                  s.end();
+                }
+              }
+            });
+
+    // as a result we'll have
+    // 1. publish subscriber that creates inner spans
+    // 2. tracing subscriber without current context - subscription was done outside any scope
+    // 3. inner subscriber that will add onNext attribute to inner spans
+    // I.e. tracing subscriber context (root) at subscription time will be different from inner in
+    // onNext
+    publish
+        .take(2)
+        .subscribe(
+            n -> {
+              assertThat(Span.current().getSpanContext().isValid()).isTrue();
+              Span.current().setAttribute("onNext", true);
+            },
+            error -> fail(error.getMessage()));
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName("inner")
+                        .hasNoParent()
+                        .hasAttributes(attributeEntry("onNext", true))),
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span ->
+                    span.hasName("inner")
+                        .hasNoParent()
+                        .hasAttributes(attributeEntry("onNext", true))));
+  }
+
+  @Test
+  void doesNotOverrideInnerCurrentSpansAsync() {
+    Flux<Object> publish =
+        Flux.create(
+            sink -> {
+              Span s = tracer.spanBuilder("inner").startSpan();
+              try (Scope scope = s.makeCurrent()) {
+                sink.next(s);
+              } finally {
+                s.end();
+              }
+            });
+
+    publish
+        .take(1)
+        .delayElements(Duration.ofMillis(1))
+        .doOnNext(
+            span -> {
+              assertThat(Span.current().getSpanContext().isValid()).isTrue();
+              assertThat(Span.current()).isSameAs(span);
+            })
+        .subscribe(
+            span -> assertThat(Span.current()).isSameAs(span), error -> fail(error.getMessage()));
+
+    testing.waitAndAssertTraces(
+        trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("inner").hasNoParent()));
+  }
+
+  @Test
+  void doesNotOverrideInnerCurrentSpansWithThereIsOuterCurrent() {
+    Flux<Object> publish =
+        Flux.create(
+            sink -> {
+              for (int i = 0; i < 2; i++) {
+                Span s = tracer.spanBuilder("inner").startSpan();
+                try (Scope scope = s.makeCurrent()) {
+                  sink.next(i);
+                } finally {
+                  s.end();
+                }
+              }
+            });
+
+    // as a result we'll have
+    // 1. publish subscriber that creates inner spans
+    // 2. tracing subscriber with outer context - it was active at subscription time
+    // 3. inner subscriber that will add onNext attribute
+    // I.e. tracing subscriber context at subscription time will be different from inner in onNext
+    Span outer = tracer.spanBuilder("outer").startSpan();
+    try (Scope scope = outer.makeCurrent()) {
+      StepVerifier.create(
+              publish
+                  .take(2)
+                  .doOnNext(
+                      n -> {
+                        assertThat(Span.current().getSpanContext().isValid()).isTrue();
+                        Span.current().setAttribute("onNext", true);
+                      })
+                  .subscriberContext(
+                      // subscribers that know that their subscription can happen
+                      // ahead of time and in the 'wrong' context, has to clean up 'wrong' context
+                      context ->
+                          ContextPropagationOperator.storeOpenTelemetryContext(
+                              context, Context.root())))
+          .expectNextCount(2)
+          .verifyComplete();
+
+      outer.end();
+    }
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("outer").hasNoParent(),
+                span ->
+                    span.hasName("inner")
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributes(attributeEntry("onNext", true)),
+                span ->
+                    span.hasName("inner")
+                        .hasParent(trace.getSpan(0))
+                        .hasAttributes(attributeEntry("onNext", true))));
+  }
+
   private <T> Mono<T> monoSpan(Mono<T> mono, String spanName) {
   private <T> Mono<T> monoSpan(Mono<T> mono, String spanName) {
     return ContextPropagationOperator.ScalarPropagatingMono.create(mono)
     return ContextPropagationOperator.ScalarPropagatingMono.create(mono)
         .doOnEach(
         .doOnEach(

+ 1 - 0
instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.java

@@ -102,6 +102,7 @@ abstract class AbstractReactorNettyHttpClientTest
     options.disableTestRedirects();
     options.disableTestRedirects();
     options.enableTestReadTimeout();
     options.enableTestReadTimeout();
     options.setUserAgent(USER_AGENT);
     options.setUserAgent(USER_AGENT);
+    options.enableTestCallbackWithImplicitParent();
 
 
     options.setClientSpanErrorMapper(
     options.setClientSpanErrorMapper(
         (uri, exception) -> {
         (uri, exception) -> {

+ 5 - 0
instrumentation/spring/spring-webflux-5.0/javaagent/src/test/groovy/client/SpringWebfluxHttpClientTest.groovy

@@ -80,6 +80,11 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
     false
     false
   }
   }
 
 
+  @Override
+  boolean testCallbackWithImplicitParent() {
+    true
+  }
+
   @Override
   @Override
   Set<AttributeKey<?>> httpAttributes(URI uri) {
   Set<AttributeKey<?>> httpAttributes(URI uri) {
     def attributes = super.httpAttributes(uri)
     def attributes = super.httpAttributes(uri)

+ 21 - 0
testing-common/src/main/groovy/io/opentelemetry/instrumentation/test/base/HttpClientTest.groovy

@@ -6,6 +6,7 @@
 package io.opentelemetry.instrumentation.test.base
 package io.opentelemetry.instrumentation.test.base
 
 
 import static org.junit.jupiter.api.Assumptions.assumeTrue
 import static org.junit.jupiter.api.Assumptions.assumeTrue
+import static org.junit.jupiter.api.Assumptions.assumeFalse
 
 
 import io.opentelemetry.api.common.AttributeKey
 import io.opentelemetry.api.common.AttributeKey
 import io.opentelemetry.api.trace.SpanId
 import io.opentelemetry.api.trace.SpanId
@@ -221,6 +222,11 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
       return HttpClientTest.this.testCallbackWithParent()
       return HttpClientTest.this.testCallbackWithParent()
     }
     }
 
 
+    @Override
+    protected boolean testCallbackWithImplicitParent() {
+      return HttpClientTest.this.testCallbackWithImplicitParent()
+    }
+
     @Override
     @Override
     protected boolean testErrorWithCallback() {
     protected boolean testErrorWithCallback() {
       return HttpClientTest.this.testErrorWithCallback()
       return HttpClientTest.this.testErrorWithCallback()
@@ -294,10 +300,18 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
 
 
   def "trace request with callback and no parent"() {
   def "trace request with callback and no parent"() {
     assumeTrue(testCallback())
     assumeTrue(testCallback())
+    assumeFalse(testCallbackWithImplicitParent())
     expect:
     expect:
     junitTest.requestWithCallbackAndNoParent()
     junitTest.requestWithCallbackAndNoParent()
   }
   }
 
 
+  def "trace request with callback and implicit parent"() {
+    assumeTrue(testCallback())
+    assumeTrue(testCallbackWithImplicitParent())
+    expect:
+    junitTest.requestWithCallbackAndImplicitParent()
+  }
+
   def "basic request with 1 redirect"() {
   def "basic request with 1 redirect"() {
     assumeTrue(testRedirects())
     assumeTrue(testRedirects())
     expect:
     expect:
@@ -497,6 +511,13 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
     true
     true
   }
   }
 
 
+  boolean testCallbackWithImplicitParent() {
+    // depending on async behavior callback can be executed within
+    // parent span scope or outside of the scope, e.g. in reactor-netty or spring
+    // callback is correlated.
+    false
+  }
+
   boolean testErrorWithCallback() {
   boolean testErrorWithCallback() {
     return true
     return true
   }
   }

+ 35 - 1
testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java

@@ -8,6 +8,7 @@ package io.opentelemetry.instrumentation.testing.junit.http;
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
 import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
 import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
 import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.junit.Assume.assumeFalse;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 
 import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.api.common.AttributeKey;
@@ -197,7 +198,9 @@ public abstract class AbstractHttpClientTest<REQUEST> {
     if (!testErrorWithCallback()) {
     if (!testErrorWithCallback()) {
       options.disableTestErrorWithCallback();
       options.disableTestErrorWithCallback();
     }
     }
-
+    if (testCallbackWithImplicitParent()) {
+      options.enableTestCallbackWithImplicitParent();
+    }
     configure(options);
     configure(options);
   }
   }
 
 
@@ -306,6 +309,7 @@ public abstract class AbstractHttpClientTest<REQUEST> {
   @Test
   @Test
   void requestWithCallbackAndNoParent() throws Throwable {
   void requestWithCallbackAndNoParent() throws Throwable {
     assumeTrue(options.testCallback);
     assumeTrue(options.testCallback);
+    assumeFalse(options.testCallbackWithImplicitParent);
 
 
     String method = "GET";
     String method = "GET";
     URI uri = resolveAddress("/success");
     URI uri = resolveAddress("/success");
@@ -326,6 +330,29 @@ public abstract class AbstractHttpClientTest<REQUEST> {
                 span -> span.hasName("callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
                 span -> span.hasName("callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
   }
   }
 
 
+  @Test
+  void requestWithCallbackAndImplicitParent() throws Throwable {
+    assumeTrue(options.testCallbackWithImplicitParent);
+
+    String method = "GET";
+    URI uri = resolveAddress("/success");
+
+    RequestResult result =
+        doRequestWithCallback(method, uri, () -> testing.runWithSpan("callback", () -> {}));
+
+    assertThat(result.get()).isEqualTo(200);
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> assertClientSpan(span, uri, method, 200).hasNoParent(),
+                span -> assertServerSpan(span).hasParent(trace.getSpan(0)),
+                span ->
+                    span.hasName("callback")
+                        .hasKind(SpanKind.INTERNAL)
+                        .hasParent(trace.getSpan(0))));
+  }
+
   @Test
   @Test
   void basicRequestWith1Redirect() throws Exception {
   void basicRequestWith1Redirect() throws Exception {
     // TODO quite a few clients create an extra span for the redirect
     // TODO quite a few clients create an extra span for the redirect
@@ -1112,6 +1139,13 @@ public abstract class AbstractHttpClientTest<REQUEST> {
     return true;
     return true;
   }
   }
 
 
+  protected boolean testCallbackWithImplicitParent() {
+    // depending on async behavior callback can be executed within
+    // parent span scope or outside of the scope, e.g. in reactor-netty or spring
+    // callback is correlated.
+    return false;
+  }
+
   protected boolean testErrorWithCallback() {
   protected boolean testErrorWithCallback() {
     return true;
     return true;
   }
   }

+ 6 - 0
testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/HttpClientTestOptions.java

@@ -56,6 +56,7 @@ public final class HttpClientTestOptions {
   boolean testCausalityWithCallback = true;
   boolean testCausalityWithCallback = true;
   boolean testCallback = true;
   boolean testCallback = true;
   boolean testCallbackWithParent = true;
   boolean testCallbackWithParent = true;
+  boolean testCallbackWithImplicitParent = false;
   boolean testErrorWithCallback = true;
   boolean testErrorWithCallback = true;
 
 
   HttpClientTestOptions() {}
   HttpClientTestOptions() {}
@@ -159,6 +160,11 @@ public final class HttpClientTestOptions {
     return this;
     return this;
   }
   }
 
 
+  public HttpClientTestOptions enableTestCallbackWithImplicitParent() {
+    testCallbackWithImplicitParent = true;
+    return this;
+  }
+
   public HttpClientTestOptions disableTestErrorWithCallback() {
   public HttpClientTestOptions disableTestErrorWithCallback() {
     testErrorWithCallback = false;
     testErrorWithCallback = false;
     return this;
     return this;