Browse Source

Migrate scala executor tests to Java (#5618)

Anuraag Agrawal 3 years ago
parent
commit
9f3ad23a9a
16 changed files with 332 additions and 507 deletions
  1. 2 0
      instrumentation/akka/akka-actor-fork-join-2.5/javaagent/build.gradle.kts
  2. 0 150
      instrumentation/akka/akka-actor-fork-join-2.5/javaagent/src/test/groovy/AkkaExecutorInstrumentationTest.groovy
  3. 13 8
      instrumentation/akka/akka-actor-fork-join-2.5/javaagent/src/test/java/io/opentelemetry/instrumentation/akkaactor/AkkaAsyncChild.java
  4. 40 0
      instrumentation/akka/akka-actor-fork-join-2.5/javaagent/src/test/java/io/opentelemetry/instrumentation/akkaactor/AkkaExecutorInstrumentationTest.java
  5. 8 0
      instrumentation/executors/javaagent/build.gradle.kts
  6. 17 165
      instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/ExecutorInstrumentationTest.java
  7. 4 7
      instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/JavaAsyncChild.java
  8. 0 19
      instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/LambdaGen.java
  9. 9 0
      instrumentation/executors/testing/build.gradle.kts
  10. 168 0
      instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/AbstractExecutorServiceTest.java
  11. 15 0
      instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/TestTask.java
  12. 2 0
      instrumentation/scala-fork-join-2.8/javaagent/build.gradle.kts
  13. 0 150
      instrumentation/scala-fork-join-2.8/javaagent/src/test/groovy/ScalaExecutorInstrumentationTest.groovy
  14. 13 8
      instrumentation/scala-fork-join-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/scalaexecutors/ScalaAsyncChild.java
  15. 40 0
      instrumentation/scala-fork-join-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/scalaexecutors/ScalaExecutorInstrumentationTest.java
  16. 1 0
      settings.gradle.kts

+ 2 - 0
instrumentation/akka/akka-actor-fork-join-2.5/javaagent/build.gradle.kts

@@ -13,4 +13,6 @@ muzzle {
 
 dependencies {
   library("com.typesafe.akka:akka-actor_2.11:2.5.0")
+
+  testImplementation(project(":instrumentation:executors:testing"))
 }

+ 0 - 150
instrumentation/akka/akka-actor-fork-join-2.5/javaagent/src/test/groovy/AkkaExecutorInstrumentationTest.groovy

@@ -1,150 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-import akka.dispatch.forkjoin.ForkJoinPool
-import akka.dispatch.forkjoin.ForkJoinTask
-import io.opentelemetry.api.trace.SpanKind
-import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
-import spock.lang.Shared
-
-import java.lang.reflect.InvocationTargetException
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.Callable
-import java.util.concurrent.Future
-import java.util.concurrent.RejectedExecutionException
-import java.util.concurrent.ThreadPoolExecutor
-import java.util.concurrent.TimeUnit
-
-/**
- * Test executor instrumentation for Akka specific classes.
- * This is to large extent a copy of ExecutorInstrumentationTest.
- */
-class AkkaExecutorInstrumentationTest extends AgentInstrumentationSpecification {
-
-  @Shared
-  def executeRunnable = { e, c -> e.execute((Runnable) c) }
-  @Shared
-  def akkaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) }
-  @Shared
-  def submitRunnable = { e, c -> e.submit((Runnable) c) }
-  @Shared
-  def submitCallable = { e, c -> e.submit((Callable) c) }
-  @Shared
-  def akkaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) }
-  @Shared
-  def akkaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) }
-
-  def "#poolName '#testName' propagates"() {
-    setup:
-    def pool = poolImpl
-    def m = method
-
-    new Runnable() {
-      @Override
-      void run() {
-        runWithSpan("parent") {
-          // this child will have a span
-          def child1 = new AkkaAsyncChild()
-          // this child won't
-          def child2 = new AkkaAsyncChild(false, false)
-          m(pool, child1)
-          m(pool, child2)
-          child1.waitForCompletion()
-          child2.waitForCompletion()
-        }
-      }
-    }.run()
-
-    expect:
-    assertTraces(1) {
-      trace(0, 2) {
-        span(0) {
-          name "parent"
-          kind SpanKind.INTERNAL
-          hasNoParent()
-        }
-        span(1) {
-          name "asyncChild"
-          kind SpanKind.INTERNAL
-          childOf(span(0))
-        }
-      }
-    }
-
-    cleanup:
-    pool?.shutdown()
-    pool?.awaitTermination(10, TimeUnit.SECONDS)
-
-    // Unfortunately, there's no simple way to test the cross product of methods/pools.
-    where:
-    testName               | method                  | poolImpl
-    "execute Runnable"     | executeRunnable         | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
-    "submit Runnable"      | submitRunnable          | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
-    "submit Callable"      | submitCallable          | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
-
-    // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
-    "execute Runnable"     | executeRunnable         | new ForkJoinPool()
-    "execute ForkJoinTask" | akkaExecuteForkJoinTask | new ForkJoinPool()
-    "submit Runnable"      | submitRunnable          | new ForkJoinPool()
-    "submit Callable"      | submitCallable          | new ForkJoinPool()
-    "submit ForkJoinTask"  | akkaSubmitForkJoinTask  | new ForkJoinPool()
-    "invoke ForkJoinTask"  | akkaInvokeForkJoinTask  | new ForkJoinPool()
-    poolName = poolImpl.class.name
-  }
-
-  def "ForkJoinPool '#name' reports after canceled jobs"() {
-    setup:
-    def pool = poolImpl
-    def m = method
-    List<AkkaAsyncChild> children = new ArrayList<>()
-    List<Future> jobFutures = new ArrayList<>()
-
-    new Runnable() {
-      @Override
-      void run() {
-        runWithSpan("parent") {
-          try {
-            for (int i = 0; i < 20; ++i) {
-              // Our current instrumentation instrumentation does not behave very well
-              // if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
-              // child traces sometimes since state can contain only one parent span - and
-              // we do not really have a good way for attributing work to correct parent span
-              // if we reuse Callable/Runnable.
-              // Solution for now is to never reuse a Callable/Runnable.
-              AkkaAsyncChild child = new AkkaAsyncChild(false, true)
-              children.add(child)
-              try {
-                Future f = m(pool, child)
-                jobFutures.add(f)
-              } catch (InvocationTargetException e) {
-                throw e.getCause()
-              }
-            }
-          } catch (RejectedExecutionException ignored) {
-          }
-
-          for (Future f : jobFutures) {
-            f.cancel(false)
-          }
-          for (AkkaAsyncChild child : children) {
-            child.unblock()
-          }
-        }
-      }
-    }.run()
-
-    expect:
-    waitForTraces(1).size() == 1
-
-    cleanup:
-    pool?.shutdown()
-    pool?.awaitTermination(10, TimeUnit.SECONDS)
-
-    where:
-    name              | method         | poolImpl
-    "submit Runnable" | submitRunnable | new ForkJoinPool()
-    "submit Callable" | submitCallable | new ForkJoinPool()
-  }
-}

+ 13 - 8
instrumentation/akka/akka-actor-fork-join-2.5/javaagent/src/test/java/AkkaAsyncChild.java → instrumentation/akka/akka-actor-fork-join-2.5/javaagent/src/test/java/io/opentelemetry/instrumentation/akkaactor/AkkaAsyncChild.java

@@ -3,24 +3,22 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
+package io.opentelemetry.instrumentation.akkaactor;
+
 import akka.dispatch.forkjoin.ForkJoinTask;
 import io.opentelemetry.api.GlobalOpenTelemetry;
 import io.opentelemetry.api.trace.Tracer;
-import java.util.concurrent.Callable;
+import io.opentelemetry.javaagent.instrumentation.javaconcurrent.TestTask;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class AkkaAsyncChild extends ForkJoinTask<Object> implements Runnable, Callable<Object> {
+final class AkkaAsyncChild extends ForkJoinTask<Object> implements TestTask {
   private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test");
 
   private final AtomicBoolean blockThread;
   private final boolean doTraceableWork;
   private final CountDownLatch latch = new CountDownLatch(1);
 
-  public AkkaAsyncChild() {
-    this(/* doTraceableWork= */ true, /* blockThread= */ false);
-  }
-
   public AkkaAsyncChild(boolean doTraceableWork, boolean blockThread) {
     this.doTraceableWork = doTraceableWork;
     this.blockThread = new AtomicBoolean(blockThread);
@@ -40,6 +38,7 @@ public class AkkaAsyncChild extends ForkJoinTask<Object> implements Runnable, Ca
     return true;
   }
 
+  @Override
   public void unblock() {
     blockThread.set(false);
   }
@@ -55,8 +54,14 @@ public class AkkaAsyncChild extends ForkJoinTask<Object> implements Runnable, Ca
     return null;
   }
 
-  public void waitForCompletion() throws InterruptedException {
-    latch.await();
+  @Override
+  public void waitForCompletion() {
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new AssertionError(e);
+    }
   }
 
   private void runImpl() {

+ 40 - 0
instrumentation/akka/akka-actor-fork-join-2.5/javaagent/src/test/java/io/opentelemetry/instrumentation/akkaactor/AkkaExecutorInstrumentationTest.java

@@ -0,0 +1,40 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.instrumentation.akkaactor;
+
+import akka.dispatch.forkjoin.ForkJoinPool;
+import akka.dispatch.forkjoin.ForkJoinTask;
+import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.javaagent.instrumentation.javaconcurrent.AbstractExecutorServiceTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class AkkaExecutorInstrumentationTest
+    extends AbstractExecutorServiceTest<ForkJoinPool, AkkaAsyncChild> {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
+
+  AkkaExecutorInstrumentationTest() {
+    super(new ForkJoinPool(), testing);
+  }
+
+  @Override
+  protected AkkaAsyncChild newTask(boolean doTraceableWork, boolean blockThread) {
+    return new AkkaAsyncChild(doTraceableWork, blockThread);
+  }
+
+  @Test
+  void invokeForkJoinTask() {
+    executeTwoTasks(task -> executor().invoke((ForkJoinTask<?>) task));
+  }
+
+  @Test
+  void submitForkJoinTask() {
+    executeTwoTasks(task -> executor().submit((ForkJoinTask<?>) task));
+  }
+}

+ 8 - 0
instrumentation/executors/javaagent/build.gradle.kts

@@ -8,6 +8,10 @@ muzzle {
   }
 }
 
+dependencies {
+  testImplementation(project(":instrumentation:executors:testing"))
+}
+
 testing {
   suites {
     // CompletableFuture behaves differently if ForkJoinPool has no parallelism
@@ -18,6 +22,10 @@ testing {
         }
       }
 
+      dependencies {
+        implementation(project(":instrumentation:executors:testing"))
+      }
+
       targets {
         all {
           testTask.configure {

+ 17 - 165
instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/ExecutorInstrumentationTest.java

@@ -5,10 +5,8 @@
 
 package io.opentelemetry.javaagent.instrumentation.javaconcurrent;
 
-import io.opentelemetry.api.trace.SpanKind;
 import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
 import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -24,32 +22,32 @@ import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.function.ThrowingConsumer;
 
-@SuppressWarnings("ClassCanBeStatic")
-class ExecutorInstrumentationTest {
+abstract class ExecutorInstrumentationTest<T extends ExecutorService>
+    extends AbstractExecutorServiceTest<T, JavaAsyncChild> {
 
   @RegisterExtension
   static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
 
-  @Nested
-  @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-  class ThreadPoolExecutorTest extends AbstractExecutorServiceTest<ThreadPoolExecutor> {
+  ExecutorInstrumentationTest(T executor) {
+    super(executor, testing);
+  }
+
+  @Override
+  protected JavaAsyncChild newTask(boolean doTraceableWork, boolean blockThread) {
+    return new JavaAsyncChild(doTraceableWork, blockThread);
+  }
+
+  static class ThreadPoolExecutorTest extends ExecutorInstrumentationTest<ThreadPoolExecutor> {
     ThreadPoolExecutorTest() {
       super(new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<>(20)));
     }
   }
 
-  @Nested
-  @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-  class ScheduledThreadPoolExecutorTest
-      extends AbstractExecutorServiceTest<ScheduledThreadPoolExecutor> {
+  static class ScheduledThreadPoolExecutorTest
+      extends ExecutorInstrumentationTest<ScheduledThreadPoolExecutor> {
     ScheduledThreadPoolExecutorTest() {
       super(new ScheduledThreadPoolExecutor(1));
     }
@@ -96,9 +94,7 @@ class ExecutorInstrumentationTest {
     }
   }
 
-  @Nested
-  @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-  class ForkJoinPoolTest extends AbstractExecutorServiceTest<ForkJoinPool> {
+  static class ForkJoinPoolTest extends ExecutorInstrumentationTest<ForkJoinPool> {
     ForkJoinPoolTest() {
       super(new ForkJoinPool(20));
     }
@@ -115,157 +111,13 @@ class ExecutorInstrumentationTest {
   }
 
   // CustomThreadPoolExecutor would normally be disabled except enabled by system property.
-  @Nested
-  @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-  class CustomThreadPoolExecutorTest extends AbstractExecutorServiceTest<CustomThreadPoolExecutor> {
+  static class CustomThreadPoolExecutorTest
+      extends ExecutorInstrumentationTest<CustomThreadPoolExecutor> {
     CustomThreadPoolExecutorTest() {
       super(new CustomThreadPoolExecutor());
     }
   }
 
-  abstract static class AbstractExecutorServiceTest<T extends ExecutorService> {
-    private final T executor;
-
-    AbstractExecutorServiceTest(T executor) {
-      this.executor = executor;
-    }
-
-    T executor() {
-      return executor;
-    }
-
-    @AfterAll
-    void shutdown() throws Exception {
-      executor.shutdown();
-      executor.awaitTermination(10, TimeUnit.SECONDS);
-    }
-
-    @Test
-    void executeRunnable() {
-      executeTwoTasks(executor::execute);
-    }
-
-    @Test
-    void submitRunnable() {
-      executeTwoTasks(task -> executor.submit((Runnable) task));
-    }
-
-    @Test
-    void submitCallable() {
-      executeTwoTasks(task -> executor.submit((Callable<?>) task));
-    }
-
-    @Test
-    void invokeAll() {
-      executeTwoTasks(task -> executor.invokeAll(Collections.singleton(task)));
-    }
-
-    @Test
-    void invokeAllWithTimeout() {
-      executeTwoTasks(
-          task -> executor.invokeAll(Collections.singleton(task), 10, TimeUnit.SECONDS));
-    }
-
-    @Test
-    void invokeAny() {
-      executeTwoTasks(task -> executor.invokeAny(Collections.singleton(task)));
-    }
-
-    @Test
-    void invokeAnyWithTimeout() {
-      executeTwoTasks(
-          task -> executor.invokeAny(Collections.singleton(task), 10, TimeUnit.SECONDS));
-    }
-
-    @Test
-    void executeLambdaRunnable() {
-      executeTwoTasks(task -> executor.execute(() -> task.run()));
-    }
-
-    @Test
-    void submitLambdaRunnable() {
-      executeTwoTasks(task -> executor.submit(() -> task.run()));
-    }
-
-    @Test
-    void submitLambdaCallable() {
-      executeTwoTasks(
-          task ->
-              executor.submit(
-                  () -> {
-                    task.run();
-                    return null;
-                  }));
-    }
-
-    @Test
-    void submitRunnableAndCancel() {
-      executeAndCancelTasks(task -> executor.submit((Runnable) task));
-    }
-
-    @Test
-    void submitCallableAndCancel() {
-      executeAndCancelTasks(task -> executor.submit((Callable<?>) task));
-    }
-  }
-
-  static void executeTwoTasks(ThrowingConsumer<JavaAsyncChild> task) {
-    testing.runWithSpan(
-        "parent",
-        () -> {
-          // this child will have a span
-          JavaAsyncChild child1 = new JavaAsyncChild();
-          // this child won't
-          JavaAsyncChild child2 = new JavaAsyncChild(false, false);
-          try {
-            task.accept(child1);
-            task.accept(child2);
-          } catch (Throwable t) {
-            throw new AssertionError(t);
-          }
-          child1.waitForCompletion();
-          child2.waitForCompletion();
-        });
-    testing.waitAndAssertTraces(
-        trace ->
-            trace.hasSpansSatisfyingExactly(
-                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
-                span ->
-                    span.hasName("asyncChild")
-                        .hasKind(SpanKind.INTERNAL)
-                        .hasParent(trace.getSpan(0))));
-  }
-
-  static void executeAndCancelTasks(Function<JavaAsyncChild, Future<?>> task) {
-    List<JavaAsyncChild> children = new ArrayList<>();
-    List<Future<?>> jobFutures = new ArrayList<>();
-
-    testing.runWithSpan(
-        "parent",
-        () -> {
-          for (int i = 0; i < 20; i++) {
-            // Our current instrumentation instrumentation does not behave very well
-            // if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
-            // child traces sometimes since state can contain only one parent span - and
-            // we do not really have a good way for attributing work to correct parent span
-            // if we reuse Callable/Runnable.
-            // Solution for now is to never reuse a Callable/Runnable.
-            JavaAsyncChild child = new JavaAsyncChild(true, true);
-            children.add(child);
-            Future<?> f = task.apply(child);
-            jobFutures.add(f);
-          }
-
-          jobFutures.forEach(f -> f.cancel(false));
-          children.forEach(JavaAsyncChild::unblock);
-        });
-
-    // Just check there is a single trace, this test is primarily to make sure that scopes aren't
-    // leak on
-    // cancellation.
-    testing.waitAndAssertTraces(trace -> {});
-  }
-
   @SuppressWarnings("RedundantOverride")
   private static class CustomThreadPoolExecutor extends AbstractExecutorService {
 

+ 4 - 7
instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/JavaAsyncChild.java

@@ -7,23 +7,18 @@ package io.opentelemetry.javaagent.instrumentation.javaconcurrent;
 
 import io.opentelemetry.api.GlobalOpenTelemetry;
 import io.opentelemetry.api.trace.Tracer;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class JavaAsyncChild extends ForkJoinTask<Object> implements Runnable, Callable<Object> {
+final class JavaAsyncChild extends ForkJoinTask<Object> implements TestTask {
   private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test");
 
   private final AtomicBoolean blockThread;
   private final boolean doTraceableWork;
   private final CountDownLatch latch = new CountDownLatch(1);
 
-  public JavaAsyncChild() {
-    this(/* doTraceableWork= */ true, /* blockThread= */ false);
-  }
-
-  public JavaAsyncChild(boolean doTraceableWork, boolean blockThread) {
+  JavaAsyncChild(boolean doTraceableWork, boolean blockThread) {
     this.doTraceableWork = doTraceableWork;
     this.blockThread = new AtomicBoolean(blockThread);
   }
@@ -42,6 +37,7 @@ public class JavaAsyncChild extends ForkJoinTask<Object> implements Runnable, Ca
     return true;
   }
 
+  @Override
   public void unblock() {
     blockThread.set(false);
   }
@@ -57,6 +53,7 @@ public class JavaAsyncChild extends ForkJoinTask<Object> implements Runnable, Ca
     return null;
   }
 
+  @Override
   public void waitForCompletion() {
     try {
       latch.await();

+ 0 - 19
instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/LambdaGen.java

@@ -1,19 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-package io.opentelemetry.javaagent.instrumentation.javaconcurrent;
-
-import java.util.concurrent.Callable;
-
-class LambdaGen {
-
-  static Callable<?> wrapCallable(Callable<?> callable) {
-    return () -> callable.call();
-  }
-
-  static Runnable wrapRunnable(Runnable runnable) {
-    return () -> runnable.run();
-  }
-}

+ 9 - 0
instrumentation/executors/testing/build.gradle.kts

@@ -0,0 +1,9 @@
+plugins {
+  id("otel.java-conventions")
+}
+
+dependencies {
+  api("org.junit.jupiter:junit-jupiter-api")
+
+  implementation(project(":testing-common"))
+}

+ 168 - 0
instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/AbstractExecutorServiceTest.java

@@ -0,0 +1,168 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.javaconcurrent;
+
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.function.ThrowingConsumer;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractExecutorServiceTest<T extends ExecutorService, U extends TestTask> {
+
+  private final T executor;
+  private final InstrumentationExtension testing;
+
+  protected AbstractExecutorServiceTest(T executor, InstrumentationExtension testing) {
+    this.executor = executor;
+    this.testing = testing;
+  }
+
+  protected abstract U newTask(boolean doTraceableWork, boolean blockThread);
+
+  protected T executor() {
+    return executor;
+  }
+
+  @AfterAll
+  void shutdown() throws InterruptedException {
+    executor.shutdown();
+    executor.awaitTermination(10, TimeUnit.SECONDS);
+  }
+
+  @Test
+  void executeRunnable() {
+    executeTwoTasks(executor::execute);
+  }
+
+  @Test
+  void submitRunnable() {
+    executeTwoTasks(task -> executor.submit((Runnable) task));
+  }
+
+  @Test
+  void submitCallable() {
+    executeTwoTasks(task -> executor.submit((Callable<?>) task));
+  }
+
+  @Test
+  void invokeAll() {
+    executeTwoTasks(task -> executor.invokeAll(Collections.singleton(task)));
+  }
+
+  @Test
+  void invokeAllWithTimeout() {
+    executeTwoTasks(task -> executor.invokeAll(Collections.singleton(task), 10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void invokeAny() {
+    executeTwoTasks(task -> executor.invokeAny(Collections.singleton(task)));
+  }
+
+  @Test
+  void invokeAnyWithTimeout() {
+    executeTwoTasks(task -> executor.invokeAny(Collections.singleton(task), 10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void executeLambdaRunnable() {
+    executeTwoTasks(task -> executor.execute(() -> task.run()));
+  }
+
+  @Test
+  void submitLambdaRunnable() {
+    executeTwoTasks(task -> executor.submit(() -> task.run()));
+  }
+
+  @Test
+  void submitLambdaCallable() {
+    executeTwoTasks(
+        task ->
+            executor.submit(
+                () -> {
+                  task.run();
+                  return null;
+                }));
+  }
+
+  @Test
+  void submitRunnableAndCancel() {
+    executeAndCancelTasks(task -> executor.submit((Runnable) task));
+  }
+
+  @Test
+  void submitCallableAndCancel() {
+    executeAndCancelTasks(task -> executor.submit((Callable<?>) task));
+  }
+
+  protected final void executeTwoTasks(ThrowingConsumer<U> task) {
+    testing.runWithSpan(
+        "parent",
+        () -> {
+          // this child will have a span
+          U child1 = newTask(true, false);
+          // this child won't
+          U child2 = newTask(false, false);
+          try {
+            task.accept(child1);
+            task.accept(child2);
+          } catch (Throwable t) {
+            throw new AssertionError(t);
+          }
+          child1.waitForCompletion();
+          child2.waitForCompletion();
+        });
+    testing.waitAndAssertTraces(
+        trace ->
+            trace.hasSpansSatisfyingExactly(
+                span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
+                span ->
+                    span.hasName("asyncChild")
+                        .hasKind(SpanKind.INTERNAL)
+                        .hasParent(trace.getSpan(0))));
+  }
+
+  protected final void executeAndCancelTasks(Function<U, Future<?>> task) {
+    List<U> children = new ArrayList<>();
+    List<Future<?>> jobFutures = new ArrayList<>();
+
+    testing.runWithSpan(
+        "parent",
+        () -> {
+          for (int i = 0; i < 20; i++) {
+            // Our current instrumentation instrumentation does not behave very well
+            // if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
+            // child traces sometimes since state can contain only one parent span - and
+            // we do not really have a good way for attributing work to correct parent span
+            // if we reuse Callable/Runnable.
+            // Solution for now is to never reuse a Callable/Runnable.
+            U child = newTask(true, true);
+            children.add(child);
+            Future<?> f = task.apply(child);
+            jobFutures.add(f);
+          }
+
+          jobFutures.forEach(f -> f.cancel(false));
+          children.forEach(U::unblock);
+        });
+
+    // Just check there is a single trace, this test is primarily to make sure that scopes aren't
+    // leak on
+    // cancellation.
+    testing.waitAndAssertTraces(trace -> {});
+  }
+}

+ 15 - 0
instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/javaconcurrent/TestTask.java

@@ -0,0 +1,15 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.javaconcurrent;
+
+import java.util.concurrent.Callable;
+
+public interface TestTask extends Runnable, Callable<Object> {
+
+  void unblock();
+
+  void waitForCompletion();
+}

+ 2 - 0
instrumentation/scala-fork-join-2.8/javaagent/build.gradle.kts

@@ -24,6 +24,8 @@ dependencies {
 
   testInstrumentation(project(":instrumentation:jdbc:javaagent"))
 
+  testImplementation(project(":instrumentation:executors:testing"))
+
   add("slickTestImplementation", project(":testing-common"))
   add("slickTestImplementation", "org.scala-lang:scala-library")
   add("slickTestImplementation", "com.typesafe.slick:slick_2.11:3.2.0")

+ 0 - 150
instrumentation/scala-fork-join-2.8/javaagent/src/test/groovy/ScalaExecutorInstrumentationTest.groovy

@@ -1,150 +0,0 @@
-/*
- * Copyright The OpenTelemetry Authors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-import io.opentelemetry.api.trace.SpanKind
-import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
-import scala.concurrent.forkjoin.ForkJoinPool
-import scala.concurrent.forkjoin.ForkJoinTask
-import spock.lang.Shared
-
-import java.lang.reflect.InvocationTargetException
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.Callable
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Future
-import java.util.concurrent.RejectedExecutionException
-import java.util.concurrent.ThreadPoolExecutor
-import java.util.concurrent.TimeUnit
-
-/**
- * Test executor instrumentation for Scala specific classes.
- * This is to large extent a copy of ExecutorInstrumentationTest.
- */
-class ScalaExecutorInstrumentationTest extends AgentInstrumentationSpecification {
-
-  @Shared
-  def executeRunnable = { e, c -> e.execute((Runnable) c) }
-  @Shared
-  def scalaExecuteForkJoinTask = { e, c -> e.execute((ForkJoinTask) c) }
-  @Shared
-  def submitRunnable = { e, c -> e.submit((Runnable) c) }
-  @Shared
-  def submitCallable = { e, c -> e.submit((Callable) c) }
-  @Shared
-  def scalaSubmitForkJoinTask = { e, c -> e.submit((ForkJoinTask) c) }
-  @Shared
-  def scalaInvokeForkJoinTask = { e, c -> e.invoke((ForkJoinTask) c) }
-
-  def "#poolImpl '#testName' propagates"() {
-    setup:
-    def pool = poolImpl
-    def m = method
-
-    new Runnable() {
-      @Override
-      void run() {
-        runWithSpan("parent") {
-          // this child will have a span
-          def child1 = new ScalaAsyncChild()
-          // this child won't
-          def child2 = new ScalaAsyncChild(false, false)
-          m(pool, child1)
-          m(pool, child2)
-          child1.waitForCompletion()
-          child2.waitForCompletion()
-        }
-      }
-    }.run()
-
-    expect:
-    assertTraces(1) {
-      trace(0, 2) {
-        span(0) {
-          name "parent"
-          kind SpanKind.INTERNAL
-          hasNoParent()
-        }
-        span(1) {
-          name "asyncChild"
-          kind SpanKind.INTERNAL
-          childOf span(0)
-        }
-      }
-    }
-
-    cleanup:
-    pool?.shutdown()
-
-    // Unfortunately, there's no simple way to test the cross product of methods/pools.
-    where:
-    testName               | method                   | poolImpl
-    "execute Runnable"     | executeRunnable          | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
-    "submit Runnable"      | submitRunnable           | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
-    "submit Callable"      | submitCallable           | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
-
-    // ForkJoinPool has additional set of method overloads for ForkJoinTask to deal with
-    "execute Runnable"     | executeRunnable          | new ForkJoinPool()
-    "execute ForkJoinTask" | scalaExecuteForkJoinTask | new ForkJoinPool()
-    "submit Runnable"      | submitRunnable           | new ForkJoinPool()
-    "submit Callable"      | submitCallable           | new ForkJoinPool()
-    "submit ForkJoinTask"  | scalaSubmitForkJoinTask  | new ForkJoinPool()
-    "invoke ForkJoinTask"  | scalaInvokeForkJoinTask  | new ForkJoinPool()
-  }
-
-  def "#poolImpl '#testName' reports after canceled jobs"() {
-    setup:
-    ExecutorService pool = poolImpl
-    def m = method
-    List<ScalaAsyncChild> children = new ArrayList<>()
-    List<Future> jobFutures = new ArrayList<>()
-
-    new Runnable() {
-      @Override
-      void run() {
-        runWithSpan("parent") {
-          try {
-            for (int i = 0; i < 20; ++i) {
-              // Our current instrumentation instrumentation does not behave very well
-              // if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
-              // child traces sometimes since state can contain only one parent span - and
-              // we do not really have a good way for attributing work to correct parent span
-              // if we reuse Callable/Runnable.
-              // Solution for now is to never reuse a Callable/Runnable.
-              ScalaAsyncChild child = new ScalaAsyncChild(false, true)
-              children.add(child)
-              try {
-                Future f = m(pool, child)
-                jobFutures.add(f)
-              } catch (InvocationTargetException e) {
-                throw e.getCause()
-              }
-            }
-          } catch (RejectedExecutionException e) {
-          }
-
-          for (Future f : jobFutures) {
-            f.cancel(false)
-          }
-          for (ScalaAsyncChild child : children) {
-            child.unblock()
-          }
-        }
-      }
-    }.run()
-
-    expect:
-    waitForTraces(1).size() == 1
-
-    // Wait for shutdown to make sure any remaining tasks finish and cleanup context since we don't
-    // wait on the tasks.
-    pool.shutdown()
-    pool.awaitTermination(10, TimeUnit.SECONDS)
-
-    where:
-    testName          | method         | poolImpl
-    "submit Runnable" | submitRunnable | new ForkJoinPool()
-    "submit Callable" | submitCallable | new ForkJoinPool()
-  }
-}

+ 13 - 8
instrumentation/scala-fork-join-2.8/javaagent/src/test/java/ScalaAsyncChild.java → instrumentation/scala-fork-join-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/scalaexecutors/ScalaAsyncChild.java

@@ -3,24 +3,22 @@
  * SPDX-License-Identifier: Apache-2.0
  */
 
+package io.opentelemetry.javaagent.instrumentation.scalaexecutors;
+
 import io.opentelemetry.api.GlobalOpenTelemetry;
 import io.opentelemetry.api.trace.Tracer;
-import java.util.concurrent.Callable;
+import io.opentelemetry.javaagent.instrumentation.javaconcurrent.TestTask;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import scala.concurrent.forkjoin.ForkJoinTask;
 
-public class ScalaAsyncChild extends ForkJoinTask<Object> implements Runnable, Callable<Object> {
+final class ScalaAsyncChild extends ForkJoinTask<Object> implements TestTask {
   private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test");
 
   private final AtomicBoolean blockThread;
   private final boolean doTraceableWork;
   private final CountDownLatch latch = new CountDownLatch(1);
 
-  public ScalaAsyncChild() {
-    this(/* doTraceableWork= */ true, /* blockThread= */ false);
-  }
-
   public ScalaAsyncChild(boolean doTraceableWork, boolean blockThread) {
     this.doTraceableWork = doTraceableWork;
     this.blockThread = new AtomicBoolean(blockThread);
@@ -40,6 +38,7 @@ public class ScalaAsyncChild extends ForkJoinTask<Object> implements Runnable, C
     return true;
   }
 
+  @Override
   public void unblock() {
     blockThread.set(false);
   }
@@ -55,8 +54,14 @@ public class ScalaAsyncChild extends ForkJoinTask<Object> implements Runnable, C
     return null;
   }
 
-  public void waitForCompletion() throws InterruptedException {
-    latch.await();
+  @Override
+  public void waitForCompletion() {
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new AssertionError(e);
+    }
   }
 
   private void runImpl() {

+ 40 - 0
instrumentation/scala-fork-join-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/scalaexecutors/ScalaExecutorInstrumentationTest.java

@@ -0,0 +1,40 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.javaagent.instrumentation.scalaexecutors;
+
+import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
+import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
+import io.opentelemetry.javaagent.instrumentation.javaconcurrent.AbstractExecutorServiceTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import scala.concurrent.forkjoin.ForkJoinPool;
+import scala.concurrent.forkjoin.ForkJoinTask;
+
+class ScalaExecutorInstrumentationTest
+    extends AbstractExecutorServiceTest<ForkJoinPool, ScalaAsyncChild> {
+
+  @RegisterExtension
+  static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
+
+  ScalaExecutorInstrumentationTest() {
+    super(new ForkJoinPool(), testing);
+  }
+
+  @Override
+  protected ScalaAsyncChild newTask(boolean doTraceableWork, boolean blockThread) {
+    return new ScalaAsyncChild(doTraceableWork, blockThread);
+  }
+
+  @Test
+  void invokeForkJoinTask() {
+    executeTwoTasks(task -> executor().invoke((ForkJoinTask<?>) task));
+  }
+
+  @Test
+  void submitForkJoinTask() {
+    executeTwoTasks(task -> executor().submit((ForkJoinTask<?>) task));
+  }
+}

+ 1 - 0
settings.gradle.kts

@@ -191,6 +191,7 @@ include(":instrumentation:elasticsearch:elasticsearch-transport-5.0:javaagent")
 include(":instrumentation:elasticsearch:elasticsearch-transport-5.3:javaagent")
 include(":instrumentation:elasticsearch:elasticsearch-transport-6.0:javaagent")
 include(":instrumentation:executors:javaagent")
+include(":instrumentation:executors:testing")
 include(":instrumentation:external-annotations:javaagent")
 include(":instrumentation:external-annotations:javaagent-unit-tests")
 include(":instrumentation:finatra-2.9:javaagent")