Kaynağa Gözat

StructuredTaskScope instrumentation (#11202)

Artyom Gabeev 10 ay önce
ebeveyn
işleme
35437d865f

+ 2 - 1
instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java

@@ -33,6 +33,7 @@ public class ExecutorsInstrumentationModule extends InstrumentationModule {
         new JavaForkJoinTaskInstrumentation(),
         new RunnableInstrumentation(),
         new ThreadPoolExtendingExecutorInstrumentation(),
-        new VirtualThreadInstrumentation());
+        new VirtualThreadInstrumentation(),
+        new StructuredTaskScopeInstrumentation());
   }
 }

+ 57 - 0
instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeInstrumentation.java

@@ -0,0 +1,57 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.executors;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.api.util.VirtualField;
+import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
+import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
+import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import java.util.concurrent.Callable;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+
+public class StructuredTaskScopeInstrumentation implements TypeInstrumentation {
+
+  @Override
+  public ElementMatcher<TypeDescription> typeMatcher() {
+    return named("java.util.concurrent.StructuredTaskScope");
+  }
+
+  @Override
+  public void transform(TypeTransformer transformer) {
+    transformer.applyAdviceToMethod(
+        named("fork").and(takesArgument(0, Callable.class)),
+        this.getClass().getName() + "$ForkCallableAdvice");
+  }
+
+  @SuppressWarnings("unused")
+  public static class ForkCallableAdvice {
+
+    @Advice.OnMethodEnter(suppress = Throwable.class)
+    public static PropagatedContext enterCallableFork(@Advice.Argument(0) Callable<?> task) {
+      Context context = Java8BytecodeBridge.currentContext();
+      if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
+        VirtualField<Callable<?>, PropagatedContext> virtualField =
+            VirtualField.find(Callable.class, PropagatedContext.class);
+        return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
+      }
+      return null;
+    }
+
+    @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
+    public static void exitCallableFork(
+        @Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable) {
+      ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable);
+    }
+  }
+}

+ 78 - 0
instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeTest.java

@@ -0,0 +1,78 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.executors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledForJreRange;
+import org.junit.jupiter.api.condition.JRE;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+@EnabledForJreRange(min = JRE.JAVA_21)
+class StructuredTaskScopeTest {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
+
+  @Test
+  void multipleForkJoin() throws Exception {
+    Class<?> sofTaskScopeClass =
+        Class.forName("java.util.concurrent.StructuredTaskScope$ShutdownOnFailure");
+    Object taskScope = sofTaskScopeClass.getDeclaredConstructor().newInstance();
+    Class<?> taskScopeClass = Class.forName("java.util.concurrent.StructuredTaskScope");
+    Method forkMethod = taskScopeClass.getDeclaredMethod("fork", Callable.class);
+    Method joinMethod = taskScopeClass.getDeclaredMethod("join");
+    Method closeMethod = taskScopeClass.getDeclaredMethod("close");
+
+    Class<?> subtaskClass = Class.forName("java.util.concurrent.StructuredTaskScope$Subtask");
+    Method getMethod = subtaskClass.getDeclaredMethod("get");
+
+    Callable<String> callable1 =
+        () -> {
+          testing.runWithSpan("task1", () -> {});
+          return "a";
+        };
+    Callable<String> callable2 =
+        () -> {
+          testing.runWithSpan("task2", () -> {});
+          return "b";
+        };
+
+    String result =
+        testing.runWithSpan(
+            "parent",
+            () -> {
+              try {
+                Object fork1 = forkMethod.invoke(taskScope, callable1);
+                Object fork2 = forkMethod.invoke(taskScope, callable2);
+                joinMethod.invoke(taskScope);
+
+                return "" + getMethod.invoke(fork1) + getMethod.invoke(fork2);
+              } catch (Exception e) {
+                throw new AssertionError(e);
+              }
+            });
+
+    assertThat(result).isEqualTo("ab");
+
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactlyInAnyOrder(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName("task1").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)),
+                span ->
+                    span.hasName("task2").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0))));
+
+    closeMethod.invoke(taskScope);
+  }
+}