Browse Source

Stop kotlin coroutine dispatcher from propagating context (#11500)

Lauri Tulmin 9 months ago
parent
commit
0ea05a8420

+ 1 - 0
instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/build.gradle.kts

@@ -42,6 +42,7 @@ dependencies {
 
   testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0")
   testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.0.0")
+  testLibrary("io.vertx:vertx-lang-kotlin-coroutines:3.6.0")
 }
 
 kotlin {

+ 48 - 0
instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutineDispatcherInstrumentation.java

@@ -0,0 +1,48 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;
+
+import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.extendsClass;
+import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+
+public class KotlinCoroutineDispatcherInstrumentation implements TypeInstrumentation {
+
+  @Override
+  public ElementMatcher<ClassLoader> classLoaderOptimization() {
+    return hasClassesNamed("kotlinx.coroutines.CoroutineDispatcher");
+  }
+
+  @Override
+  public ElementMatcher<TypeDescription> typeMatcher() {
+    return extendsClass(named("kotlinx.coroutines.CoroutineDispatcher"));
+  }
+
+  @Override
+  public void transform(TypeTransformer transformer) {
+    transformer.applyAdviceToMethod(
+        named("dispatch").and(takesArgument(1, Runnable.class)),
+        this.getClass().getName() + "$StopContextPropagationAdvice");
+  }
+
+  @SuppressWarnings("unused")
+  public static class StopContextPropagationAdvice {
+
+    @Advice.OnMethodEnter
+    public static void enter(@Advice.Argument(value = 1, readOnly = false) Runnable runnable) {
+      if (runnable != null) {
+        runnable = RunnableWrapper.stopPropagation(runnable);
+      }
+    }
+  }
+}

+ 3 - 2
instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationModule.java

@@ -5,7 +5,7 @@
 
 package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;
 
-import static java.util.Collections.singletonList;
+import static java.util.Arrays.asList;
 
 import com.google.auto.service.AutoService;
 import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
@@ -35,6 +35,7 @@ public class KotlinCoroutinesInstrumentationModule extends InstrumentationModule
 
   @Override
   public List<TypeInstrumentation> typeInstrumentations() {
-    return singletonList(new KotlinCoroutinesInstrumentation());
+    return asList(
+        new KotlinCoroutinesInstrumentation(), new KotlinCoroutineDispatcherInstrumentation());
   }
 }

+ 22 - 0
instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/RunnableWrapper.java

@@ -0,0 +1,22 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+public final class RunnableWrapper {
+
+  public static Runnable stopPropagation(Runnable runnable) {
+    return () -> {
+      try (Scope ignored = Context.root().makeCurrent()) {
+        runnable.run();
+      }
+    };
+  }
+
+  private RunnableWrapper() {}
+}

+ 39 - 3
instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt

@@ -19,6 +19,8 @@ import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRo
 import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo
 import io.opentelemetry.sdk.testing.assertj.TraceAssert
 import io.opentelemetry.semconv.incubating.CodeIncubatingAttributes
+import io.vertx.core.Vertx
+import io.vertx.kotlin.coroutines.dispatcher
 import kotlinx.coroutines.CompletableDeferred
 import kotlinx.coroutines.CoroutineDispatcher
 import kotlinx.coroutines.CoroutineScope
@@ -41,6 +43,7 @@ import kotlinx.coroutines.withTimeout
 import kotlinx.coroutines.yield
 import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.AfterAll
+import org.junit.jupiter.api.Assumptions
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.TestInstance
 import org.junit.jupiter.api.extension.ExtensionContext
@@ -63,17 +66,20 @@ class KotlinCoroutinesInstrumentationTest {
   companion object {
     val threadPool = Executors.newFixedThreadPool(2)
     val singleThread = Executors.newSingleThreadExecutor()
+    val vertx = Vertx.vertx()
+
+    @JvmStatic
+    @RegisterExtension
+    val testing = AgentInstrumentationExtension.create()
   }
 
   @AfterAll
   fun shutdown() {
     threadPool.shutdown()
     singleThread.shutdown()
+    vertx.close()
   }
 
-  @RegisterExtension
-  val testing = AgentInstrumentationExtension.create()
-
   val tracer = testing.openTelemetry.getTracer("test")
 
   @ParameterizedTest
@@ -517,6 +523,7 @@ class KotlinCoroutinesInstrumentationTest {
       arguments(DispatcherWrapper(Dispatchers.Unconfined)),
       arguments(DispatcherWrapper(threadPool.asCoroutineDispatcher())),
       arguments(DispatcherWrapper(singleThread.asCoroutineDispatcher())),
+      arguments(DispatcherWrapper(vertx.dispatcher()))
     )
   }
 
@@ -559,4 +566,33 @@ class KotlinCoroutinesInstrumentationTest {
       return otelContext.makeCurrent()
     }
   }
+
+  // regression test for
+  // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11411
+  @ParameterizedTest
+  @ArgumentsSource(DispatchersSource::class)
+  fun `dispatch does not propagate context`(dispatcher: DispatcherWrapper) {
+    Assumptions.assumeTrue(dispatcher.dispatcher != Dispatchers.Unconfined)
+
+    runTest(dispatcher) {
+      dispatcher.dispatcher.dispatch(coroutineContext) {
+        tracer.spanBuilder("dispatched").startSpan().end()
+      }
+    }
+
+    testing.waitAndAssertTraces(
+      { trace ->
+        trace.hasSpansSatisfyingExactly({
+          it.hasName("parent")
+            .hasNoParent()
+        })
+      },
+      { trace ->
+        trace.hasSpansSatisfyingExactly({
+          it.hasName("dispatched")
+            .hasNoParent()
+        })
+      }
+    )
+  }
 }