Sfoglia il codice sorgente

ZIO 2.0 instrumentation (#7980)

Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
Dmytro Iaroslavskyi 1 anno fa
parent
commit
511f6b7361

+ 1 - 0
docs/supported-libraries.md

@@ -132,6 +132,7 @@ These are the supported libraries and frameworks:
 | [Vert.x Kafka Client](https://vertx.io/docs/vertx-kafka-client/java/)                                                                       | 3.6+                          | N/A                                                                                                                                                                                                                                                                                                                                                                                     | [Messaging Spans]                                                                      |
 | [Vert.x RxJava2](https://vertx.io/docs/vertx-rx/java2/)                                                                                     | 3.5+                          | N/A                                                                                                                                                                                                                                                                                                                                                                                     | context propagation only                                                               |
 | [Vibur DBCP](https://www.vibur.org/)                                                                                                        | 11.0+                         | [opentelemetry-vibur-dbcp-11.0](../instrumentation/vibur-dbcp-11.0/library)                                                                                                                                                                                                                                                                                                             | [Database Pool Metrics]                                                                |
+| [ZIO](https://zio.dev/)                                                                                                                     | 2.0.0+                        | N/A                                                                                                                                                                                                                                                                                                                                                                                     | Context propagation                                                                    |
 
 **[1]** Standalone library instrumentation refers to instrumentation that can be used without the Java agent.
 

+ 43 - 0
instrumentation/zio/zio-2.0/javaagent/build.gradle.kts

@@ -0,0 +1,43 @@
+plugins {
+  id("otel.javaagent-instrumentation")
+  id("otel.nullaway-conventions")
+  id("otel.scala-conventions")
+}
+
+val zioVersion = "2.0.0"
+val scalaVersion = "2.12"
+
+muzzle {
+  pass {
+    group.set("dev.zio")
+    module.set("zio_2.12")
+    versions.set("[$zioVersion,)")
+    assertInverse.set(true)
+  }
+  pass {
+    group.set("dev.zio")
+    module.set("zio_2.13")
+    versions.set("[$zioVersion,)")
+    assertInverse.set(true)
+  }
+  pass {
+    group.set("dev.zio")
+    module.set("zio_3")
+    versions.set("[$zioVersion,)")
+    assertInverse.set(true)
+  }
+}
+
+dependencies {
+  compileOnly("dev.zio:zio_$scalaVersion:$zioVersion")
+
+  testImplementation("dev.zio:zio_$scalaVersion:$zioVersion")
+
+  latestDepTestLibrary("dev.zio:zio_$scalaVersion:+")
+}
+
+tasks {
+  withType<Test>().configureEach {
+    jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
+  }
+}

+ 35 - 0
instrumentation/zio/zio-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/zio/v2_0/FiberContext.java

@@ -0,0 +1,35 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.zio.v2_0;
+
+import io.opentelemetry.context.Context;
+
+public final class FiberContext {
+  private Context context;
+
+  private FiberContext(Context context) {
+    this.context = context;
+  }
+
+  public static FiberContext create() {
+    return new FiberContext(Context.current());
+  }
+
+  public void onSuspend() {
+    this.context = Context.current();
+
+    // Reset context to avoid leaking it to other fibers
+    Context.root().makeCurrent();
+  }
+
+  public void onResume() {
+    // Not using returned Scope because we can't reliably close it. If fiber also opens a Scope and
+    // does not close it before onSuspend is called then the attempt to close the scope returned
+    // here would not work because it is not the current scope.
+    // See https://github.com/open-telemetry/opentelemetry-java/issues/5303
+    this.context.makeCurrent();
+  }
+}

+ 64 - 0
instrumentation/zio/zio-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/zio/v2_0/TracingSupervisor.java

@@ -0,0 +1,64 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.zio.v2_0;
+
+import io.opentelemetry.instrumentation.api.util.VirtualField;
+import scala.Option;
+import zio.Exit;
+import zio.Fiber;
+import zio.Supervisor;
+import zio.Unsafe;
+import zio.ZEnvironment;
+import zio.ZIO;
+import zio.ZIO$;
+
+@SuppressWarnings("unchecked")
+public final class TracingSupervisor extends Supervisor<Object> {
+
+  @SuppressWarnings("rawtypes")
+  private final VirtualField<Fiber.Runtime, FiberContext> virtualField;
+
+  @SuppressWarnings("rawtypes")
+  public TracingSupervisor(VirtualField<Fiber.Runtime, FiberContext> virtualField) {
+    this.virtualField = virtualField;
+  }
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public ZIO value(Object trace) {
+    return ZIO$.MODULE$.unit();
+  }
+
+  @Override
+  public <R, E, A1> void onStart(
+      ZEnvironment<R> environment,
+      ZIO<R, E, A1> effect,
+      Option<Fiber.Runtime<Object, Object>> parent,
+      Fiber.Runtime<E, A1> fiber,
+      Unsafe unsafe) {
+    FiberContext context = FiberContext.create();
+    virtualField.set(fiber, context);
+  }
+
+  @Override
+  public <R, E, A1> void onEnd(Exit<E, A1> value, Fiber.Runtime<E, A1> fiber, Unsafe unsafe) {}
+
+  @Override
+  public <E, A1> void onSuspend(Fiber.Runtime<E, A1> fiber, Unsafe unsafe) {
+    FiberContext context = virtualField.get(fiber);
+    if (context != null) {
+      context.onSuspend();
+    }
+  }
+
+  @Override
+  public <E, A1> void onResume(Fiber.Runtime<E, A1> fiber, Unsafe unsafe) {
+    FiberContext context = virtualField.get(fiber);
+    if (context != null) {
+      context.onResume();
+    }
+  }
+}

+ 21 - 0
instrumentation/zio/zio-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/zio/v2_0/ZioIgnoredTypesConfigurer.java

@@ -0,0 +1,21 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.zio.v2_0;
+
+import com.google.auto.service.AutoService;
+import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesBuilder;
+import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesConfigurer;
+import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
+
+@AutoService(IgnoredTypesConfigurer.class)
+public class ZioIgnoredTypesConfigurer implements IgnoredTypesConfigurer {
+
+  @Override
+  public void configure(IgnoredTypesBuilder builder, ConfigProperties config) {
+    // context is propagated using FiberContext
+    builder.ignoreTaskClass("zio.internal.FiberRuntime");
+  }
+}

+ 26 - 0
instrumentation/zio/zio-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/zio/v2_0/ZioInstrumentationModule.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.zio.v2_0;
+
+import static java.util.Collections.singletonList;
+
+import com.google.auto.service.AutoService;
+import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import java.util.List;
+
+@AutoService(InstrumentationModule.class)
+public class ZioInstrumentationModule extends InstrumentationModule {
+
+  public ZioInstrumentationModule() {
+    super("zio");
+  }
+
+  @Override
+  public List<TypeInstrumentation> typeInstrumentations() {
+    return singletonList(new ZioRuntimeInstrumentation());
+  }
+}

+ 46 - 0
instrumentation/zio/zio-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/zio/v2_0/ZioRuntimeInstrumentation.java

@@ -0,0 +1,46 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.zio.v2_0;
+
+import static net.bytebuddy.matcher.ElementMatchers.isMethod;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+import io.opentelemetry.instrumentation.api.util.VirtualField;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
+import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import zio.Fiber;
+import zio.Supervisor;
+
+public class ZioRuntimeInstrumentation implements TypeInstrumentation {
+
+  @Override
+  public ElementMatcher<TypeDescription> typeMatcher() {
+    return named("zio.Runtime$");
+  }
+
+  @Override
+  public void transform(TypeTransformer transformer) {
+    transformer.applyAdviceToMethod(
+        isMethod().and(named("defaultSupervisor")), getClass().getName() + "$DefaultSupervisor");
+  }
+
+  @SuppressWarnings("unused")
+  public static final class DefaultSupervisor {
+
+    private DefaultSupervisor() {}
+
+    @Advice.OnMethodExit(suppress = Throwable.class)
+    public static void onExit(@Advice.Return(readOnly = false) Supervisor<?> supervisor) {
+      @SuppressWarnings("rawtypes")
+      VirtualField<Fiber.Runtime, FiberContext> virtualField =
+          VirtualField.find(Fiber.Runtime.class, FiberContext.class);
+      supervisor = supervisor.$plus$plus(new TracingSupervisor(virtualField));
+    }
+  }
+}

+ 8 - 0
instrumentation/zio/zio-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/zio/v2_0/package-info.java

@@ -0,0 +1,8 @@
+@DefaultQualifier(
+    value = NonNull.class,
+    locations = {TypeUseLocation.FIELD, TypeUseLocation.PARAMETER, TypeUseLocation.RETURN})
+package io.opentelemetry.javaagent.instrumentation.zio.v2_0;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.framework.qual.DefaultQualifier;
+import org.checkerframework.framework.qual.TypeUseLocation;

+ 132 - 0
instrumentation/zio/zio-2.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/zio/v2_0/ZioRuntimeInstrumentationTest.scala

@@ -0,0 +1,132 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.zio.v2_0
+
+import io.opentelemetry.instrumentation.testing.junit._
+import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName
+import io.opentelemetry.javaagent.instrumentation.zio.v2_0.ZioTestFixtures._
+import io.opentelemetry.sdk.testing.assertj.{SpanDataAssert, TraceAssert}
+import io.opentelemetry.sdk.trace.data.SpanData
+import org.junit.jupiter.api.extension.RegisterExtension
+import org.junit.jupiter.api.{Test, TestInstance}
+
+import java.util
+import java.util.function.Consumer
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class ZioRuntimeInstrumentationTest {
+
+  @RegisterExtension
+  val testing: InstrumentationExtension = AgentInstrumentationExtension.create()
+
+  @Test
+  def traceIsPropagatedToChildFiber(): Unit = {
+    runNestedFibers()
+
+    testing.waitAndAssertTraces(
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_1_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_2_span_1").hasParent(trace.getSpan(0)))
+        )
+      }
+    )
+  }
+
+  @Test
+  def traceIsPreservedWhenFiberIsInterrupted(): Unit = {
+    runInterruptedFiber()
+
+    testing.waitAndAssertTraces(
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_1_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_2_span_1").hasParent(trace.getSpan(0)))
+        )
+      }
+    )
+  }
+
+  @Test
+  def synchronizedFibersDoNotInterfereWithEachOthersTraces(): Unit = {
+    runSynchronizedFibers()
+
+    testing.waitAndAssertTraces(
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_1_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_1_span_2").hasParent(trace.getSpan(0)))
+        )
+      },
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_2_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_2_span_2").hasParent(trace.getSpan(0)))
+        )
+      }
+    )
+  }
+
+  @Test
+  def concurrentFibersDoNotInterfereWithEachOthersTraces(): Unit = {
+    runConcurrentFibers()
+
+    testing.waitAndAssertSortedTraces(
+      orderByRootSpanName("fiber_1_span_1", "fiber_2_span_1", "fiber_3_span_1"),
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_1_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_1_span_2").hasParent(trace.getSpan(0)))
+        )
+      },
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_2_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_2_span_2").hasParent(trace.getSpan(0)))
+        )
+      },
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_3_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_3_span_2").hasParent(trace.getSpan(0)))
+        )
+      }
+    )
+  }
+
+  @Test
+  def sequentialFibersDoNotInterfereWithEachOthersTraces(): Unit = {
+    runSequentialFibers()
+
+    testing.waitAndAssertTraces(
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_1_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_1_span_2").hasParent(trace.getSpan(0)))
+        )
+      },
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_2_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_2_span_2").hasParent(trace.getSpan(0)))
+        )
+      },
+      assertTrace { trace =>
+        trace.hasSpansSatisfyingExactly(
+          assertSpan(_.hasName("fiber_3_span_1").hasNoParent),
+          assertSpan(_.hasName("fiber_3_span_2").hasParent(trace.getSpan(0)))
+        )
+      }
+    )
+  }
+
+  private def assertTrace(f: TraceAssert => Any): Consumer[TraceAssert] =
+    (t: TraceAssert) => f(t)
+
+  private def assertSpan(f: SpanDataAssert => Any): Consumer[SpanDataAssert] =
+    (t: SpanDataAssert) => f(t)
+
+}

+ 158 - 0
instrumentation/zio/zio-2.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/zio/v2_0/ZioTestFixtures.scala

@@ -0,0 +1,158 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.zio.v2_0
+
+import io.opentelemetry.api.GlobalOpenTelemetry
+import io.opentelemetry.api.trace.Tracer
+import zio._
+
+import java.util.concurrent.Executors
+
+object ZioTestFixtures {
+
+  def runNestedFibers(): Unit =
+    run {
+      childSpan("fiber_1_span_1") {
+        for {
+          child <- childSpan("fiber_2_span_1")(ZIO.unit).fork
+          _ <- child.join
+        } yield ()
+      }
+    }
+
+  def runInterruptedFiber(): Unit =
+    run {
+      for {
+        childStarted <- Promise.make[Nothing, Unit]
+        _ <- childSpan("fiber_1_span_1") {
+          for {
+            child <- childSpan("fiber_2_span_1") {
+              childStarted.succeed(()) *>
+                ZIO.never
+            }.fork
+            _ <- childStarted.await
+            _ <- child.interrupt
+          } yield ()
+        }
+      } yield ()
+    }
+
+  def runSynchronizedFibers(): Unit = {
+    def runFiber(
+        fiberNumber: Int,
+        onStart: UIO[Unit],
+        onEnd: UIO[Unit]
+    ): ZIO[Any, Nothing, Any] =
+      childSpan(s"fiber_${fiberNumber}_span_1") {
+        onStart *>
+          childSpan(s"fiber_${fiberNumber}_span_2") {
+            onEnd
+          }
+      }
+
+    run {
+      for {
+        fiber1Started <- Promise.make[Nothing, Unit]
+        fiber2Done <- Promise.make[Nothing, Unit]
+
+        fiber1 <- runFiber(
+          fiberNumber = 1,
+          onStart = fiber1Started.succeed(()) *> fiber2Done.await,
+          onEnd = ZIO.unit
+        ).fork
+
+        fiber2 <- runFiber(
+          fiberNumber = 2,
+          onStart = fiber1Started.await,
+          onEnd = fiber2Done.succeed(()).unit
+        ).fork
+
+        _ <- Fiber.joinAll(List(fiber1, fiber2))
+      } yield ()
+    }
+  }
+
+  def runConcurrentFibers(): Unit = {
+    def runFiber(
+        fiberNumber: Int,
+        start: Promise[Nothing, Unit]
+    ): ZIO[Any, Nothing, Unit] = {
+      start.await *>
+        childSpan(s"fiber_${fiberNumber}_span_1") {
+          ZIO.yieldNow *>
+            childSpan(s"fiber_${fiberNumber}_span_2") {
+              ZIO.yieldNow
+            }
+        }
+    }
+
+    run {
+      for {
+        start <- Promise.make[Nothing, Unit]
+        fiber1 <- runFiber(1, start).fork
+        fiber2 <- runFiber(2, start).fork
+        fiber3 <- runFiber(3, start).fork
+        _ <- start.succeed(())
+        _ <- Fiber.joinAll(List(fiber1, fiber2, fiber3))
+      } yield ()
+    }
+  }
+
+  def runSequentialFibers(): Unit = {
+    def runFiber(fiberNumber: Int): ZIO[Any, Nothing, Unit] = {
+      childSpan(s"fiber_${fiberNumber}_span_1") {
+        childSpan(s"fiber_${fiberNumber}_span_2") {
+          ZIO.unit
+        }
+      }
+    }
+
+    run {
+      for {
+        fiber1 <- runFiber(1).fork
+        _ <- fiber1.join
+        fiber2 <- runFiber(2).fork
+        _ <- fiber2.join
+        fiber3 <- runFiber(3).fork
+        _ <- fiber3.join
+      } yield ()
+    }
+  }
+
+  private val tracer: Tracer = GlobalOpenTelemetry.getTracer("test")
+
+  private def childSpan(opName: String)(op: UIO[Unit]): UIO[Unit] =
+    ZIO.scoped {
+      for {
+        scope <- ZIO.scope
+        otelSpan <- ZIO.succeed(tracer.spanBuilder(opName).startSpan())
+        otelScope <- ZIO.succeed(otelSpan.makeCurrent())
+        _ <- scope.addFinalizer(ZIO.succeed(otelSpan.end()))
+        _ <- scope.addFinalizer(ZIO.succeed(otelScope.close()))
+        _ <- op
+      } yield ()
+    }
+
+  private def run[A](zio: ZIO[Any, Nothing, A]): Unit = {
+    val executor = Executors.newSingleThreadExecutor()
+    val zioExecutor = Executor.fromJavaExecutor(executor)
+    val layer =
+      Runtime.setExecutor(zioExecutor) >>>
+        Runtime.setBlockingExecutor(zioExecutor)
+    try {
+      Unsafe.unsafe { implicit unsafe =>
+        Runtime.unsafe
+          .fromLayer(layer)
+          .unsafe
+          .run(zio)
+          .getOrThrowFiberFailure()
+      }
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+}

+ 1 - 0
settings.gradle.kts

@@ -524,6 +524,7 @@ hideFromDependabot(":instrumentation:vibur-dbcp-11.0:javaagent")
 hideFromDependabot(":instrumentation:vibur-dbcp-11.0:library")
 hideFromDependabot(":instrumentation:vibur-dbcp-11.0:testing")
 hideFromDependabot(":instrumentation:wicket-8.0:javaagent")
+hideFromDependabot(":instrumentation:zio:zio-2.0:javaagent")
 
 // benchmark
 include(":benchmark-overhead-jmh")